Extract Kubernetes command to separate file (#11669)

* Move Kubernetes command to seperate file

* fixup! Move Kubernetes command to seperate file
This commit is contained in:
Kamil Breguła 2020-10-20 23:32:19 +02:00 коммит произвёл GitHub
Родитель 3391c90247
Коммит 3a45f1f84d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 106 добавлений и 56 удалений

Просмотреть файл

@ -1277,7 +1277,7 @@ KUBERNETES_COMMANDS = (
name='generate-dag-yaml',
help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without "
"launching into a cluster",
func=lazy_load_command('airflow.cli.commands.dag_command.generate_pod_yaml'),
func=lazy_load_command('airflow.cli.commands.kubernetes_command.generate_pod_yaml'),
args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH),
),
)

Просмотреть файл

@ -19,13 +19,11 @@
import errno
import json
import logging
import os
import signal
import subprocess
import sys
from typing import List
import yaml
from graphviz.dot import Dot
from tabulate import tabulate
@ -376,47 +374,6 @@ def dag_list_dag_runs(args, dag=None):
print(table)
@cli_utils.action_logging
def generate_pod_yaml(args):
"""Generates yaml files for each task in the DAG. Used for testing output of KubernetesExecutor"""
from kubernetes.client.api_client import ApiClient
from airflow.executors.kubernetes_executor import KubeConfig, create_pod_id
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.settings import pod_mutation_hook
execution_date = args.execution_date
dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
yaml_output_path = args.output_path
kube_config = KubeConfig()
for task in dag.tasks:
ti = TaskInstance(task, execution_date)
pod = PodGenerator.construct_pod(
dag_id=args.dag_id,
task_id=ti.task_id,
pod_id=create_pod_id(
args.dag_id, ti.task_id),
try_number=ti.try_number,
kube_image=kube_config.kube_image,
date=ti.execution_date,
command=ti.command_as_list(),
pod_override_object=PodGenerator.from_obj(ti.executor_config),
scheduler_job_id="worker-config",
namespace=kube_config.executor_namespace,
base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file)
)
pod_mutation_hook(pod)
api_client = ApiClient()
date_string = pod_generator.datetime_to_label_safe_datestring(execution_date)
yaml_file_name = f"{args.dag_id}_{ti.task_id}_{date_string}.yml"
os.makedirs(os.path.dirname(yaml_output_path + "/airflow_yaml_output/"), exist_ok=True)
with open(yaml_output_path + "/airflow_yaml_output/" + yaml_file_name, "w") as output:
sanitized_pod = api_client.sanitize_for_serialization(pod)
output.write(yaml.dump(sanitized_pod))
print(f"YAML output can be found at {yaml_output_path}/airflow_yaml_output/")
@provide_session
@cli_utils.action_logging
def dag_test(args, session=None):

Просмотреть файл

@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Kubernetes sub-commands"""
import os
import yaml
from kubernetes.client.api_client import ApiClient
from airflow.executors.kubernetes_executor import KubeConfig, create_pod_id
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models import TaskInstance
from airflow.settings import pod_mutation_hook
from airflow.utils import cli as cli_utils
from airflow.utils.cli import get_dag
@cli_utils.action_logging
def generate_pod_yaml(args):
"""Generates yaml files for each task in the DAG. Used for testing output of KubernetesExecutor"""
execution_date = args.execution_date
dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
yaml_output_path = args.output_path
kube_config = KubeConfig()
for task in dag.tasks:
ti = TaskInstance(task, execution_date)
pod = PodGenerator.construct_pod(
dag_id=args.dag_id,
task_id=ti.task_id,
pod_id=create_pod_id(
args.dag_id, ti.task_id),
try_number=ti.try_number,
kube_image=kube_config.kube_image,
date=ti.execution_date,
command=ti.command_as_list(),
pod_override_object=PodGenerator.from_obj(ti.executor_config),
scheduler_job_id="worker-config",
namespace=kube_config.executor_namespace,
base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file)
)
pod_mutation_hook(pod)
api_client = ApiClient()
date_string = pod_generator.datetime_to_label_safe_datestring(execution_date)
yaml_file_name = f"{args.dag_id}_{ti.task_id}_{date_string}.yml"
os.makedirs(os.path.dirname(yaml_output_path + "/airflow_yaml_output/"), exist_ok=True)
with open(yaml_output_path + "/airflow_yaml_output/" + yaml_file_name, "w") as output:
sanitized_pod = api_client.sanitize_for_serialization(pod)
output.write(yaml.dump(sanitized_pod))
print(f"YAML output can be found at {yaml_output_path}/airflow_yaml_output/")

Просмотреть файл

@ -138,18 +138,6 @@ class TestCliDags(unittest.TestCase):
self.assertIn("graph [label=example_bash_operator labelloc=t rankdir=LR]", out)
self.assertIn("runme_2 -> run_after_loop", out)
def test_generate_dag_yaml(self):
with tempfile.TemporaryDirectory("airflow_dry_run_test/") as directory:
file_name = "example_bash_operator_run_after_loop_2020-11-03T00_00_00_plus_00_00.yml"
dag_command.generate_pod_yaml(self.parser.parse_args([
'kubernetes', 'generate-dag-yaml',
'example_bash_operator', "2020-11-03", "--output-path", directory]))
self.assertEqual(len(os.listdir(directory)), 1)
out_dir = directory + "/airflow_yaml_output/"
self.assertEqual(len(os.listdir(out_dir)), 6)
self.assertTrue(os.path.isfile(out_dir + file_name))
self.assertGreater(os.stat(out_dir + file_name).st_size, 0)
@mock.patch("airflow.cli.commands.dag_command.render_dag")
def test_show_dag_dave(self, mock_render_dag):
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:

Просмотреть файл

@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import tempfile
import unittest
from airflow.cli import cli_parser
from airflow.cli.commands import kubernetes_command
class TestGeneratteDagYamlCommand(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.parser = cli_parser.get_parser()
def test_generate_dag_yaml(self):
with tempfile.TemporaryDirectory("airflow_dry_run_test/") as directory:
file_name = "example_bash_operator_run_after_loop_2020-11-03T00_00_00_plus_00_00.yml"
kubernetes_command.generate_pod_yaml(self.parser.parse_args([
'kubernetes', 'generate-dag-yaml',
'example_bash_operator', "2020-11-03", "--output-path", directory]))
self.assertEqual(len(os.listdir(directory)), 1)
out_dir = directory + "/airflow_yaml_output/"
self.assertEqual(len(os.listdir(out_dir)), 6)
self.assertTrue(os.path.isfile(out_dir + file_name))
self.assertGreater(os.stat(out_dir + file_name).st_size, 0)