2020-08-24 20:08:44 +03:00
|
|
|
import os
|
|
|
|
from pathlib import Path
|
2021-02-25 01:11:52 +03:00
|
|
|
|
|
|
|
import pytest
|
2020-08-24 20:08:44 +03:00
|
|
|
import yaml
|
2021-02-25 01:11:52 +03:00
|
|
|
from click.testing import CliRunner
|
2020-08-24 20:08:44 +03:00
|
|
|
|
2022-04-07 19:32:51 +03:00
|
|
|
from bigquery_etl.cli.dag import create, generate, info, remove
|
2020-08-24 20:08:44 +03:00
|
|
|
|
|
|
|
TEST_DIR = Path(__file__).parent.parent
|
|
|
|
|
|
|
|
|
|
|
|
class TestDag:
|
|
|
|
@pytest.fixture
|
|
|
|
def runner(self):
|
|
|
|
return CliRunner()
|
|
|
|
|
|
|
|
def test_dag_info_without_tasks(self, runner):
|
|
|
|
result = runner.invoke(
|
|
|
|
info, ["--dags_config=" + str(TEST_DIR / "data" / "dags.yaml")]
|
|
|
|
)
|
|
|
|
assert result.exit_code == 0
|
|
|
|
assert "bqetl_core" in result.output
|
|
|
|
assert "bqetl_events" in result.output
|
|
|
|
|
|
|
|
def test_dag_info_with_tasks(self, runner):
|
|
|
|
result = runner.invoke(
|
|
|
|
info,
|
|
|
|
[
|
|
|
|
"--with_tasks",
|
|
|
|
],
|
|
|
|
)
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
2020-08-24 20:57:36 +03:00
|
|
|
def test_single_dag_info(self, runner):
|
|
|
|
result = runner.invoke(
|
|
|
|
info,
|
|
|
|
["bqetl_core", "--dags_config=" + str(TEST_DIR / "data" / "dags.yaml")],
|
|
|
|
)
|
|
|
|
assert result.exit_code == 0
|
|
|
|
assert "bqetl_core" in result.output
|
|
|
|
assert "bqetl_events" not in result.output
|
|
|
|
|
2020-08-24 20:08:44 +03:00
|
|
|
def test_dag_create_invalid_name(self, runner):
|
|
|
|
with runner.isolated_filesystem():
|
|
|
|
dags_conf = {
|
|
|
|
"bqetl_test": {
|
|
|
|
"schedule_interval": "daily",
|
|
|
|
"default_args": {
|
|
|
|
"owner": "test@example.org",
|
|
|
|
"start_date": "2020-01-01",
|
|
|
|
},
|
2021-12-14 14:46:50 +03:00
|
|
|
"tags": ["impact/tier_3"],
|
2020-08-24 20:08:44 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
with open("dags.yaml", "w") as f:
|
|
|
|
f.write(yaml.dump(dags_conf))
|
|
|
|
|
|
|
|
result = runner.invoke(
|
|
|
|
create,
|
|
|
|
[
|
|
|
|
"invalid_dag_name",
|
|
|
|
"--schedule_interval=daily",
|
|
|
|
"--owner=test@example.org",
|
2021-02-08 22:21:15 +03:00
|
|
|
"--description=test",
|
2020-08-24 20:08:44 +03:00
|
|
|
"--start_date=2020-01-01",
|
2021-12-14 14:46:50 +03:00
|
|
|
"--tag=impact/tier_3",
|
2020-08-24 20:08:44 +03:00
|
|
|
],
|
|
|
|
)
|
|
|
|
assert result.exit_code == 1
|
|
|
|
|
|
|
|
def test_dag_create(self, runner):
|
|
|
|
with runner.isolated_filesystem():
|
|
|
|
dags_conf = {
|
|
|
|
"bqetl_test": {
|
|
|
|
"schedule_interval": "daily",
|
|
|
|
"default_args": {
|
|
|
|
"owner": "test@example.org",
|
|
|
|
"start_date": "2020-01-01",
|
|
|
|
},
|
2021-12-14 14:46:50 +03:00
|
|
|
"tags": ["impact/tier_3"],
|
2020-08-24 20:08:44 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
with open("dags.yaml", "w") as f:
|
|
|
|
f.write(yaml.dump(dags_conf))
|
|
|
|
|
|
|
|
result = runner.invoke(
|
|
|
|
create,
|
|
|
|
[
|
|
|
|
"bqetl_new_dag",
|
|
|
|
"--schedule_interval=daily",
|
|
|
|
"--owner=test@example.org",
|
2021-02-08 22:21:15 +03:00
|
|
|
"--description=test",
|
2020-08-24 20:08:44 +03:00
|
|
|
"--start_date=2020-01-01",
|
2021-12-14 14:46:50 +03:00
|
|
|
"--tag=impact/tier_3",
|
2020-08-24 20:08:44 +03:00
|
|
|
],
|
|
|
|
)
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
with open("dags.yaml", "r") as dags_file:
|
|
|
|
dags_conf = yaml.safe_load(dags_file.read())
|
|
|
|
assert "bqetl_new_dag" in dags_conf
|
|
|
|
assert "bqetl_test" in dags_conf
|
|
|
|
assert (
|
|
|
|
dags_conf["bqetl_new_dag"]["default_args"]["owner"]
|
|
|
|
== "test@example.org"
|
|
|
|
)
|
2021-02-08 22:21:15 +03:00
|
|
|
assert dags_conf["bqetl_new_dag"]["description"] == "test"
|
2020-08-24 20:08:44 +03:00
|
|
|
assert (
|
|
|
|
dags_conf["bqetl_new_dag"]["default_args"]["start_date"]
|
|
|
|
== "2020-01-01"
|
|
|
|
)
|
|
|
|
assert dags_conf["bqetl_new_dag"]["schedule_interval"] == "daily"
|
|
|
|
|
|
|
|
def test_dag_remove_non_existing_dag(self, runner):
|
|
|
|
with runner.isolated_filesystem():
|
|
|
|
dags_conf = {
|
|
|
|
"bqetl_test": {
|
|
|
|
"schedule_interval": "daily",
|
|
|
|
"default_args": {
|
|
|
|
"owner": "test@example.org",
|
|
|
|
"start_date": "2020-01-01",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
"bqetl_test_2": {
|
|
|
|
"schedule_interval": "daily",
|
|
|
|
"default_args": {
|
|
|
|
"owner": "test@example.org",
|
|
|
|
"start_date": "2020-01-01",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2020-09-30 22:11:11 +03:00
|
|
|
os.makedirs("sql/moz-fx-data-shared-prod")
|
2020-08-24 20:08:44 +03:00
|
|
|
os.mkdir("dags")
|
|
|
|
|
|
|
|
with open("dags.yaml", "w") as f:
|
|
|
|
f.write(yaml.dump(dags_conf))
|
|
|
|
|
2020-09-02 21:30:14 +03:00
|
|
|
result = runner.invoke(
|
|
|
|
remove,
|
|
|
|
["non_existing_dag"],
|
|
|
|
)
|
2020-08-24 20:08:44 +03:00
|
|
|
assert result.exit_code == 1
|
|
|
|
|
|
|
|
def test_dag_remove(self, runner):
|
|
|
|
with runner.isolated_filesystem():
|
|
|
|
dags_conf = {
|
|
|
|
"bqetl_test": {
|
|
|
|
"schedule_interval": "daily",
|
|
|
|
"default_args": {
|
|
|
|
"owner": "test@example.org",
|
|
|
|
"start_date": "2020-01-01",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
"bqetl_test_2": {
|
|
|
|
"schedule_interval": "daily",
|
|
|
|
"default_args": {
|
|
|
|
"owner": "test@example.org",
|
|
|
|
"start_date": "2020-01-01",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
with open("dags.yaml", "w") as f:
|
|
|
|
f.write(yaml.dump(dags_conf))
|
|
|
|
|
2020-09-30 22:11:11 +03:00
|
|
|
os.makedirs("sql/moz-fx-data-shared-prod/telemetry_derived/query_v1")
|
2020-09-29 22:17:16 +03:00
|
|
|
with open(
|
2020-09-30 22:42:41 +03:00
|
|
|
"sql/moz-fx-data-shared-prod/telemetry_derived/query_v1/query.sql",
|
2020-09-30 22:11:11 +03:00
|
|
|
"w",
|
2020-09-29 22:17:16 +03:00
|
|
|
) as f:
|
2020-08-24 20:08:44 +03:00
|
|
|
f.write("SELECT 1")
|
|
|
|
|
|
|
|
metadata_conf = {
|
|
|
|
"friendly_name": "test",
|
|
|
|
"description": "test",
|
|
|
|
"owners": ["test@example.org"],
|
|
|
|
"scheduling": {"dag_name": "bqetl_test"},
|
|
|
|
}
|
|
|
|
|
2020-09-29 22:17:16 +03:00
|
|
|
with open(
|
2020-09-30 22:42:41 +03:00
|
|
|
"sql/moz-fx-data-shared-prod/telemetry_derived/query_v1/metadata.yaml",
|
2020-09-29 22:17:16 +03:00
|
|
|
"w",
|
|
|
|
) as f:
|
2020-08-24 20:08:44 +03:00
|
|
|
f.write(yaml.dump(metadata_conf))
|
|
|
|
|
|
|
|
os.mkdir("dags")
|
|
|
|
with open("dags/bqetl_test.py", "w") as f:
|
|
|
|
f.write("")
|
|
|
|
|
2020-09-02 21:30:14 +03:00
|
|
|
result = runner.invoke(
|
|
|
|
remove,
|
|
|
|
["bqetl_test"],
|
|
|
|
)
|
2020-08-24 20:08:44 +03:00
|
|
|
assert result.exit_code == 0
|
|
|
|
assert os.listdir("dags") == []
|
|
|
|
|
|
|
|
with open("dags.yaml", "r") as dags_file:
|
|
|
|
dags_conf = yaml.safe_load(dags_file.read())
|
|
|
|
assert "bqetl_test_2" in dags_conf
|
|
|
|
assert "bqetl_test" not in dags_conf
|
|
|
|
|
2020-09-29 22:17:16 +03:00
|
|
|
with open(
|
2020-09-30 22:42:41 +03:00
|
|
|
"sql/moz-fx-data-shared-prod/telemetry_derived/query_v1/metadata.yaml",
|
2020-09-29 22:17:16 +03:00
|
|
|
"r",
|
|
|
|
) as f:
|
2020-08-24 20:08:44 +03:00
|
|
|
metadata = yaml.safe_load(f.read())
|
|
|
|
assert "scheduling" not in metadata
|
2022-04-07 19:32:51 +03:00
|
|
|
|
|
|
|
def test_dag_generate_without_any_tasks(self, runner):
|
|
|
|
with runner.isolated_filesystem():
|
|
|
|
dags_conf = {
|
|
|
|
"bqetl_test": {
|
|
|
|
"schedule_interval": "daily",
|
|
|
|
"default_args": {
|
|
|
|
"owner": "test@example.org",
|
|
|
|
"start_date": "2020-01-01",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
os.makedirs("sql/moz-fx-data-shared-prod")
|
|
|
|
os.mkdir("dags")
|
|
|
|
|
|
|
|
with open("dags.yaml", "w") as f:
|
|
|
|
f.write(yaml.dump(dags_conf))
|
|
|
|
|
|
|
|
result = runner.invoke(
|
|
|
|
generate,
|
|
|
|
["bqetl_test"],
|
|
|
|
)
|
|
|
|
|
2023-10-13 00:05:08 +03:00
|
|
|
assert result.exit_code == 0
|