CLI allow info for specific DAG
This commit is contained in:
Родитель
8998d88d50
Коммит
48164652af
|
@ -47,6 +47,7 @@ def dag():
|
|||
|
||||
|
||||
@dag.command(help="List all available DAGs",)
|
||||
@click.argument("name", required=False)
|
||||
@sql_dir_option
|
||||
@dags_config_option
|
||||
@click.option(
|
||||
|
@ -57,14 +58,21 @@ def dag():
|
|||
default=False,
|
||||
is_flag=True,
|
||||
)
|
||||
def info(dags_config, sql_dir, with_tasks):
|
||||
def info(name, dags_config, sql_dir, with_tasks):
|
||||
"""List available DAG information."""
|
||||
if with_tasks:
|
||||
dag_collection = get_dags(sql_dir, dags_config)
|
||||
else:
|
||||
dag_collection = DagCollection.from_file(dags_config)
|
||||
|
||||
sorted_dags = sorted(dag_collection.dags, key=lambda d: d.name)
|
||||
if name:
|
||||
dag = dag_collection.dag_by_name(name)
|
||||
if not dag:
|
||||
click.echo(f"DAG {name} does not exist", err=True)
|
||||
sys.exit(1)
|
||||
sorted_dags = [dag]
|
||||
else:
|
||||
sorted_dags = sorted(dag_collection.dags, key=lambda d: d.name)
|
||||
|
||||
for dag in sorted_dags:
|
||||
click.secho(click.style(dag.name, bold=True))
|
||||
|
|
|
@ -193,7 +193,7 @@ def schedule(path, dag, depends_on_past, task_name):
|
|||
click.echo(
|
||||
(
|
||||
f"DAG {dag} does not exist. "
|
||||
"To see available DAGs run `bqetl dag list`. "
|
||||
"To see available DAGs run `bqetl dag info`. "
|
||||
"To create a new DAG run `bqetl dag create`."
|
||||
),
|
||||
err=True,
|
||||
|
|
|
@ -4,7 +4,7 @@ from click.testing import CliRunner
|
|||
from pathlib import Path
|
||||
import yaml
|
||||
|
||||
from bigquery_etl.cli.dag import info, generate, create, remove
|
||||
from bigquery_etl.cli.dag import info, create, remove
|
||||
|
||||
TEST_DIR = Path(__file__).parent.parent
|
||||
|
||||
|
@ -37,6 +37,15 @@ class TestDag:
|
|||
assert "test.multipart_query_v1" in result.output
|
||||
assert "test.incremental_query_non_incremental_export_v1" in result.output
|
||||
|
||||
def test_single_dag_info(self, runner):
|
||||
result = runner.invoke(
|
||||
info,
|
||||
["bqetl_core", "--dags_config=" + str(TEST_DIR / "data" / "dags.yaml")],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "bqetl_core" in result.output
|
||||
assert "bqetl_events" not in result.output
|
||||
|
||||
def test_dag_create_invalid_name(self, runner):
|
||||
with runner.isolated_filesystem():
|
||||
dags_conf = {
|
||||
|
|
Загрузка…
Ссылка в новой задаче