Bug 1709871 - Support private DAGs in the CLI (#2081)

This will allow creating DAGs in the private version of bigquery-etl repository.

 Changes:
 * support private_bqetl_ prefix in CLI
 * add test to validate if all DAGs here (public repo) start with `bqetl_` (similar test will be added in private repo requiring `private_bqetl_` prefix)
This commit is contained in:
akkomar 2021-05-28 19:01:36 +02:00 коммит произвёл GitHub
Родитель 288e0a687b
Коммит 7cded347a6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 28 добавлений и 3 удалений

Просмотреть файл

@ -123,7 +123,8 @@ class Dag:
"""Validate the DAG name."""
if not is_valid_dag_name(value):
raise ValueError(
f"Invalid DAG name {value}. Name must start with 'bqetl_'."
f"Invalid DAG name {value}. Name must start with 'bqetl_' "
f"or 'private_bqetl_'."
)
@tasks.validator

Просмотреть файл

@ -187,7 +187,8 @@ class Task:
"""Validate the DAG name."""
if not is_valid_dag_name(value):
raise ValueError(
f"Invalid DAG name {value} for task. Name must start with 'bqetl_'."
f"Invalid DAG name {value} for task. Name must start with 'bqetl_' "
f"or 'private_bqetl_'."
)
@task_name.validator

Просмотреть файл

@ -30,7 +30,7 @@ def is_email(s):
return re.match(r"[^@]+@[^@]+\.[^@]+", s)
DAG_NAME_RE = re.compile("^bqetl_.+$")
DAG_NAME_RE = re.compile("^(private_)?bqetl_.+$")
def is_valid_dag_name(name):

Просмотреть файл

@ -0,0 +1,18 @@
from pathlib import Path
import yaml
ROOT_DIR = Path(__file__).parent.parent.parent
def test_dag_names():
"""Verify dag names.
DAG names defined in this repository must start with `bqetl_`.
CLI allows to create DAGs prefixed with `bqetl_` and `private_bqetl_`,
the latter are reserved for private version of bigquery-etl repository.
"""
with open(str(ROOT_DIR / "dags.yaml"), "r") as dags_file:
dags_conf = yaml.safe_load(dags_file.read())
for dag_name in dags_conf:
assert dag_name.startswith("bqetl_")

Просмотреть файл

@ -121,6 +121,11 @@ class TestDag:
with pytest.raises(ValueError):
Dag("test_dag", "daily", self.default_args)
def test_private_dag_name(self):
dag = Dag("private_bqetl_test_dag", "daily", self.default_args)
assert dag.name == "private_bqetl_test_dag"
def test_schedule_interval_format(self):
assert Dag("bqetl_test_dag", "daily", self.default_args)
assert Dag("bqetl_test_dag", "weekly", self.default_args)