This commit is contained in:
Anna Scholtz 2020-05-14 16:40:02 -07:00
Родитель 2a56ff5060
Коммит 49c1dd981e
4 изменённых файлов: 96 добавлений и 52 удалений

Просмотреть файл

@ -3,6 +3,7 @@
import attr
import cattr
from jinja2 import Environment, PackageLoader
import re
from typing import List
from bigquery_etl.query_scheduling.task import Task
@ -46,12 +47,34 @@ class DagDefaultArgs:
class Dag:
"""Representation of a DAG configuration."""
name: str
schedule_interval: str
name: str = attr.ib()
schedule_interval: str = attr.ib()
default_args: DagDefaultArgs
tasks: List[Task] = []
# todo validate dag naming
@name.validator
def validate_dag_name(self, attribute, value):
"""Validate the DAG name."""
dag_name_pattern = re.compile("^bqetl_.+$")
if not dag_name_pattern.match(value):
raise ValueError(
f"Invalid DAG name {value}. Name must start with 'bqetl_'."
)
@schedule_interval.validator
def validate_schedule_interval(self, attribute, value):
"""
Validate the schedule_interval format.
Schedule intervals can be either in CRON format or one of:
@once, @hourly, @daily, @weekly, @monthly, @yearly
"""
# https://stackoverflow.com/questions/14203122/create-a-regular-expression-for-cron-statement
pattern = re.compile(
r"^(once|hourly|daily|weekly|monthly|yearly|"
r"((((\d+,)+\d+|(\d+(\/|-)\d+)|\d+|\*) ?){5,7}))$"
)
if not pattern.match(value):
raise ValueError(f"Invalid schedule_interval {value}.")
def add_tasks(self, tasks):
"""Add tasks to be scheduled as part of the DAG."""

Просмотреть файл

@ -1,10 +1,8 @@
"""Generates Airflow DAGs for scheduled queries."""
import glob
import logging
import os
from git import Repo
from airflow.models import DagBag
from argparse import ArgumentParser
from google.cloud import bigquery
from ..util import standard_args
@ -50,7 +48,7 @@ parser.add_argument(
standard_args.add_log_level(parser)
def setup_telemetry_airflow(local_dags_dir):
def setup_telemetry_airflow():
"""
Download the telemetry-airflow repository to a temporary directory and
copy generated DAGs to dags/ folder.
@ -65,28 +63,9 @@ def setup_telemetry_airflow(local_dags_dir):
Repo.clone_from(TELEMETRY_AIRFLOW_GITHUB, tmp_dir)
airflow_dag_dir = tmp_dir + "/dags"
# remove existing DAGs to speed up testing
filelist = glob.glob(os.path.join(airflow_dag_dir, "*.py"))
for f in filelist:
os.remove(f)
# for filename in glob.glob(os.path.join(local_dags_dir, "*.py")):
# shutil.copy(filename, airflow_dag_dir)
return airflow_dag_dir
def validate_airflow_dag(dag_dir, dags_collection):
"""Validate generated Airflow DAGs."""
dagbag = DagBag(dag_dir)
for dag in dags_collection.dags:
airflow_dag = dagbag.get_dag(dag_id=dag.name)
assert dagbag.import_errors == {}
assert airflow_dag is not None
def get_dags(sql_dir, dags_config):
"""Return all configured DAGs including associated tasks."""
tasks = []
@ -133,8 +112,7 @@ def main():
dags = get_dags(args.sql_dir, args.dags_config)
dags.to_airflow_dags(dags_output_dir, client)
airflow_dag_dir = setup_telemetry_airflow(dags_output_dir)
validate_airflow_dag(args.output_dir, dags)
setup_telemetry_airflow()
if __name__ == "__main__":

Просмотреть файл

@ -15,15 +15,15 @@ class TestDag:
}
def test_dag_instantiation(self):
dag = Dag("test_dag", "daily", self.default_args)
dag = Dag("bqetl_test_dag", "daily", self.default_args)
assert dag.name == "test_dag"
assert dag.name == "bqetl_test_dag"
assert dag.schedule_interval == "daily"
assert dag.tasks == []
assert dag.default_args == self.default_args
def test_add_tasks(self):
dag = Dag("test_dag", "daily", self.default_args)
dag = Dag("bqetl_test_dag", "daily", self.default_args)
query_file = (
TEST_DIR
@ -45,7 +45,7 @@ class TestDag:
def test_from_dict(self):
dag = Dag.from_dict(
{
"test_dag": {
"bqetl_test_dag": {
"schedule_interval": "daily",
"default_args": {
"owner": "test@example.com",
@ -55,7 +55,7 @@ class TestDag:
}
)
assert dag.name == "test_dag"
assert dag.name == "bqetl_test_dag"
assert dag.schedule_interval == "daily"
assert dag.default_args.owner == "test@example.com"
assert dag.default_args.email == ["test@example.com"]
@ -67,8 +67,26 @@ class TestDag:
def test_from_dict_multiple_dags(self):
with pytest.raises(DagParseException):
Dag.from_dict({"test_dag1": {}, "test_dag2": {}})
Dag.from_dict({"bqetl_test_dag1": {}, "bqetl_test_dag2": {}})
def test_from_dict_without_scheduling_interval(self):
with pytest.raises(DagParseException):
Dag.from_dict({"test_dag": {}})
Dag.from_dict({"bqetl_test_dag": {}})
def test_invalid_dag_name(self):
with pytest.raises(ValueError):
Dag("test_dag", "daily", self.default_args)
def test_schedule_interval_format(self):
assert Dag("bqetl_test_dag", "daily", self.default_args)
assert Dag("bqetl_test_dag", "weekly", self.default_args)
assert Dag("bqetl_test_dag", "once", self.default_args)
with pytest.raises(ValueError):
assert Dag("bqetl_test_dag", "never", self.default_args)
assert Dag("bqetl_test_dag", "* * * * *", self.default_args)
assert Dag("bqetl_test_dag", "1 * * * *", self.default_args)
with pytest.raises(TypeError):
assert Dag("bqetl_test_dag", 323, self.default_args)

Просмотреть файл

@ -11,6 +11,12 @@ TEST_DIR = Path(__file__).parent.parent
class TestDagCollection:
default_args = {
"owner": "test@example.org",
"email": ["test@example.org"],
"depends_on_past": False,
}
def test_dags_from_file(self):
dags_file = TEST_DIR / "data" / "dags.yaml"
dags = DagCollection.from_file(dags_file)
@ -36,24 +42,27 @@ class TestDagCollection:
def test_dags_from_dict(self):
dags = DagCollection.from_dict(
{
"test_dag1": {
"bqetl_test_dag1": {
"schedule_interval": "daily",
"default_args": {"owner": "test@example.org"},
"default_args": self.default_args,
},
"bqetl_test_dag2": {
"schedule_interval": "daily",
"default_args": self.default_args,
},
"test_dag2": {"schedule_interval": "daily", "default_args": {}},
}
)
assert len(dags.dags) == 2
assert dags.dag_by_name("test_dag1") is not None
assert dags.dag_by_name("test_dag2") is not None
assert dags.dag_by_name("bqetl_test_dag1") is not None
assert dags.dag_by_name("bqetl_test_dag2") is not None
dag1 = dags.dag_by_name("test_dag1")
dag1 = dags.dag_by_name("bqetl_test_dag1")
assert len(dag1.tasks) == 0
assert dag1.schedule_interval == "daily"
assert dag1.default_args == {"owner": "test@example.org"}
dag2 = dags.dag_by_name("test_dag2")
dag2 = dags.dag_by_name("bqetl_test_dag2")
assert len(dag2.tasks) == 0
assert dag2.schedule_interval == "daily"
assert dag2.default_args == {}
@ -68,7 +77,12 @@ class TestDagCollection:
def test_dag_by_name(self):
dags = DagCollection.from_dict(
{"test_dag1": {"schedule_interval": "daily", "default_args": {}}}
{
"bqetl_test_dag1": {
"schedule_interval": "daily",
"default_args": self.default_args,
}
}
)
assert dags.dag_by_name("test_dag1") is not None
@ -86,23 +100,25 @@ class TestDagCollection:
)
metadata = Metadata(
"test",
"test",
{},
{"dag_name": "test_dag", "depends_on_past": True, "param": "test_param"},
"test", "test", {}, {"dag_name": "bqetl_test_dag", "depends_on_past": True}
)
tasks = [Task.of_query(query_file, metadata)]
dags = DagCollection.from_dict(
{"test_dag": {"schedule_interval": "daily", "default_args": {}}}
{
"bqetl_test_dag": {
"schedule_interval": "daily",
"default_args": self.default_args,
}
}
).with_tasks(tasks)
assert len(dags.dags) == 1
dag = dags.dag_by_name("test_dag")
dag = dags.dag_by_name("bqetl_test_dag")
assert len(dag.tasks) == 1
assert dag.tasks[0].dag_name == "test_dag"
assert dag.tasks[0].dag_name == "bqetl_test_dag"
def test_dags_with_invalid_tasks(self):
with pytest.raises(InvalidDag):
@ -120,7 +136,7 @@ class TestDagCollection:
"test",
{},
{
"dag_name": "non_exisiting_dag",
"dag_name": "bqetl_non_exisiting_dag",
"depends_on_past": True,
"param": "test_param",
},
@ -129,7 +145,12 @@ class TestDagCollection:
tasks = [Task.of_query(query_file, metadata)]
DagCollection.from_dict(
{"test_dag": {"schedule_interval": "daily", "default_args": {}}}
{
"bqetl_test_dag": {
"schedule_interval": "daily",
"default_args": self.default_args,
}
}
).with_tasks(tasks)
@pytest.mark.integration
@ -147,7 +168,11 @@ class TestDagCollection:
"test",
"test",
{},
{"dag_name": "test_dag", "depends_on_past": True, "param": "test_param"},
{
"dag_name": "bqetl_test_dag",
"depends_on_past": True,
"param": "test_param",
},
)
tasks = [Task.of_query(query_file, metadata)]