[AIRFLOW-2006] Add local log catching to kubernetes operator

Closes #2947 from dimberman/AIRFLOW-2006
-kubernetes-log-aggregation
This commit is contained in:
Daniel Imberman 2018-01-25 22:37:50 +01:00 коммит произвёл Bolke de Bruin
Родитель 3dbfdafd7a
Коммит 55f2674925
6 изменённых файлов: 83 добавлений и 10 удалений

Просмотреть файл

@ -54,7 +54,7 @@ class PodLauncher(LoggingMixin):
raise
return resp
def run_pod(self, pod, startup_timeout=120):
def run_pod(self, pod, startup_timeout=120, get_logs=True):
# type: (Pod) -> State
"""
Launches the pod synchronously and waits for completion.
@ -74,15 +74,25 @@ class PodLauncher(LoggingMixin):
time.sleep(1)
self.log.debug('Pod not yet started')
final_status = self._monitor_pod(pod)
final_status = self._monitor_pod(pod, get_logs)
return final_status
def _monitor_pod(self, pod):
def _monitor_pod(self, pod, get_logs):
# type: (Pod) -> State
while self.pod_is_running(pod):
self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING))
time.sleep(2)
if get_logs:
logs = self._client.read_namespaced_pod_log(
name=pod.name,
namespace=pod.namespace,
follow=True,
tail_lines=10,
_preload_content=False)
for line in logs:
self.log.info(line)
else:
while self.pod_is_running(pod):
self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING))
time.sleep(2)
return self._task_status(self.read_pod(pod))
def _task_status(self, event):

Просмотреть файл

@ -43,12 +43,16 @@ class KubernetesPodOperator(BaseOperator):
)
launcher = pod_launcher.PodLauncher(client)
final_state = launcher.run_pod(pod, self.startup_timeout_seconds)
final_state = launcher.run_pod(
pod,
startup_timeout=self.startup_timeout_seconds,
get_logs=self.get_logs)
if final_state != State.SUCCESS:
raise AirflowException("Pod returned a failure")
except AirflowException as ex:
raise AirflowException("Pod Launching failed: {error}".format(error=ex))
@apply_defaults
def __init__(self,
namespace,
@ -60,6 +64,7 @@ class KubernetesPodOperator(BaseOperator):
labels=None,
startup_timeout_seconds=120,
kube_executor_config=None,
get_logs=True,
*args,
**kwargs):
super(KubernetesPodOperator, self).__init__(*args, **kwargs)
@ -72,3 +77,4 @@ class KubernetesPodOperator(BaseOperator):
self.startup_timeout_seconds = startup_timeout_seconds
self.name = name
self.in_cluster = in_cluster
self.get_logs = get_logs

Просмотреть файл

@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import airflow
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.models import DAG
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_kubernetes_operator',
default_args=args,
schedule_interval=None)
k = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
name="airflow-test-pod",
in_cluster=False,
task_id="task",
get_logs=True,
dag=dag
)

Просмотреть файл

@ -19,7 +19,8 @@ import unittest
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import AirflowException
from subprocess import check_call
import mock
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
try:
check_call(["kubectl", "get", "pods"])
@ -31,7 +32,6 @@ except Exception as e:
class KubernetesPodOperatorTest(unittest.TestCase):
def test_working_pod(self):
k = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
@ -44,6 +44,20 @@ class KubernetesPodOperatorTest(unittest.TestCase):
k.execute(None)
def test_logging(self):
with mock.patch.object(PodLauncher, 'log') as mock_logger:
k = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
name="test",
task_id="task",
get_logs=True
)
k.execute(None)
mock_logger.info.assert_any_call("+ echo\n")
def test_faulty_image(self):
bad_image_name = "foobar"
k = KubernetesPodOperator(namespace='default',

Просмотреть файл

@ -66,7 +66,7 @@ from jinja2 import UndefinedError
import six
NUM_EXAMPLE_DAGS = 18
NUM_EXAMPLE_DAGS = 19
DEV_NULL = '/dev/null'
TEST_DAG_FOLDER = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'dags')

Просмотреть файл

@ -156,6 +156,7 @@ class BackfillJobTest(unittest.TestCase):
'example_trigger_target_dag',
'example_trigger_controller_dag', # tested above
'test_utils', # sleeps forever
'example_kubernetes_operator', # only works with k8s cluster
]
logger = logging.getLogger('BackfillJobTest.test_backfill_examples')