Add tests for CLI query schedule

This commit is contained in:
Anna Scholtz 2020-08-21 10:27:44 -07:00
Родитель cbf560a1fa
Коммит e1dab7de0f
3 изменённых файлов: 105 добавлений и 0 удалений

Просмотреть файл

@ -221,6 +221,10 @@ def schedule(path, dag, depends_on_past, task_name):
"For more information about scheduling queries see: "
"https://github.com/mozilla/bigquery-etl#scheduling-queries-in-airflow"
)
# update dags since new task has been added
dags = get_dags(sql_dir, sql_dir.parent / "dags.yaml")
existing_dag = dags.dag_by_name(dag)
else:
if metadata.scheduling == {}:
click.echo(f"No scheduling information for: {path}", err=True)

Просмотреть файл

@ -214,6 +214,7 @@ class PublicDataJsonDag(Dag):
def to_airflow_dag(self, dag_collection):
"""Convert the DAG to its Airflow representation and return the python code."""
print("wwwaawa")
env = self._jinja_env()
dag_template = env.get_template(PUBLIC_DATA_JSON_DAG_TEMPLATE)
args = self.__dict__

Просмотреть файл

@ -1,6 +1,7 @@
import os
import pytest
from click.testing import CliRunner
import yaml
from bigquery_etl.cli.query import create, schedule
@ -73,3 +74,102 @@ class TestQuery:
"metadata.yaml",
"init.sql",
]
def test_schedule_invalid_path(self, runner):
with runner.isolated_filesystem():
result = runner.invoke(schedule, ["/test/query_v1"])
assert result.exit_code == 1
def test_schedule_invalid_query_path(self, runner):
with runner.isolated_filesystem():
os.mkdir("sql")
os.mkdir("sql/test")
os.mkdir("sql/query_v1")
result = runner.invoke(schedule, ["/test/query_v1"])
assert result.exit_code == 1
def test_schedule_query_non_existing_dag(self, runner):
with runner.isolated_filesystem():
os.mkdir("sql")
os.mkdir("sql/test")
os.mkdir("sql/test/query_v1")
open("sql/test/query_v1/query.sql", "a").close()
result = runner.invoke(schedule, ["sql/test/query_v1", "--dag=foo"])
assert result.exit_code == 1
def test_schedule_query(self, runner):
with runner.isolated_filesystem():
os.mkdir("sql")
os.mkdir("dags")
os.mkdir("sql/telemetry_derived")
os.mkdir("sql/telemetry_derived/query_v1")
with open("sql/telemetry_derived/query_v1/query.sql", "w") as f:
f.write("SELECT 1")
metadata_conf = {
"friendly_name": "test",
"description": "test",
"owners": ["test@example.org"],
}
with open("sql/telemetry_derived/query_v1/metadata.yaml", "w") as f:
f.write(yaml.dump(metadata_conf))
dag_conf = {
"bqetl_test": {
"schedule_interval": "daily",
"default_args": {
"owner": "test@example.com",
"start_date": "2020-03-29",
"email": ["test@example.org"],
"retries": 1,
},
}
}
with open("dags.yaml", "w") as f:
f.write(yaml.dump(dag_conf))
result = runner.invoke(
schedule, ["sql/telemetry_derived/query_v1", "--dag=bqetl_test"]
)
assert result.exit_code == 0
def test_reschedule_query(self, runner):
with runner.isolated_filesystem():
os.mkdir("sql")
os.mkdir("dags")
os.mkdir("sql/telemetry_derived")
os.mkdir("sql/telemetry_derived/query_v1")
with open("sql/telemetry_derived/query_v1/query.sql", "w") as f:
f.write("SELECT 1")
metadata_conf = {
"friendly_name": "test",
"description": "test",
"owners": ["test@example.org"],
"scheduling": {"dag_name": "bqetl_test"},
}
with open("sql/telemetry_derived/query_v1/metadata.yaml", "w") as f:
f.write(yaml.dump(metadata_conf))
dag_conf = {
"bqetl_test": {
"schedule_interval": "daily",
"default_args": {
"owner": "test@example.com",
"start_date": "2020-03-29",
"email": ["test@example.org"],
"retries": 1,
},
}
}
with open("dags.yaml", "w") as f:
f.write(yaml.dump(dag_conf))
result = runner.invoke(schedule, ["sql/telemetry_derived/query_v1"])
assert result.exit_code == 0