Add integration test for determining multiple task dependencies
This commit is contained in:
Родитель
e5882d8058
Коммит
9893c58cd0
|
@ -55,8 +55,7 @@ class DagCollection:
|
||||||
|
|
||||||
for dag in self.dags:
|
for dag in self.dags:
|
||||||
for task in dag.tasks:
|
for task in dag.tasks:
|
||||||
table_name = f"{task.table}_{task.version}"
|
if dataset == task.dataset and table == f"{task.table}_{task.version}":
|
||||||
if task.dataset == dataset and table_name == table:
|
|
||||||
return task
|
return task
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -89,18 +89,22 @@ class Task:
|
||||||
with open(self.query_file) as query_stream:
|
with open(self.query_file) as query_stream:
|
||||||
query = query_stream.read()
|
query = query_stream.read()
|
||||||
query_job = client.query(query, job_config=job_config)
|
query_job = client.query(query, job_config=job_config)
|
||||||
return query_job.referenced_tables
|
referenced_tables = query_job.referenced_tables
|
||||||
|
table_names = [(t.dataset_id, t.table_id) for t in referenced_tables]
|
||||||
|
return table_names
|
||||||
|
|
||||||
def get_dependencies(self, client, dag_collection):
|
def get_dependencies(self, client, dag_collection):
|
||||||
"""Perfom a dry_run to get upstream dependencies."""
|
"""Perfom a dry_run to get upstream dependencies."""
|
||||||
dependencies = []
|
dependencies = []
|
||||||
|
|
||||||
for table in self._get_referenced_tables(client):
|
for table in self._get_referenced_tables(client):
|
||||||
upstream_task = dag_collection.task_for_table()
|
upstream_task = dag_collection.task_for_table(table[0], table[1])
|
||||||
|
|
||||||
if upstream_task is not None:
|
if upstream_task is not None:
|
||||||
dependencies.append(upstream_task)
|
dependencies.append(upstream_task)
|
||||||
|
|
||||||
|
return dependencies
|
||||||
|
|
||||||
def to_airflow(self, client, dag_collection):
|
def to_airflow(self, client, dag_collection):
|
||||||
"""Convert the task configuration into the Airflow representation."""
|
"""Convert the task configuration into the Airflow representation."""
|
||||||
dependencies = self.get_dependencies()
|
dependencies = self.get_dependencies()
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
|
from google.cloud import bigquery
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import os
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from bigquery_etl.query_scheduling.task import Task, UnscheduledTask, TaskParseException
|
from bigquery_etl.query_scheduling.task import Task, UnscheduledTask, TaskParseException
|
||||||
from bigquery_etl.metadata.parse_metadata import Metadata
|
from bigquery_etl.parse_metadata import Metadata
|
||||||
|
from bigquery_etl.query_scheduling.dag_collection import DagCollection
|
||||||
|
|
||||||
TEST_DIR = Path(__file__).parent.parent
|
TEST_DIR = Path(__file__).parent.parent
|
||||||
|
|
||||||
|
@ -106,4 +109,77 @@ class TestTask:
|
||||||
)
|
)
|
||||||
|
|
||||||
task = Task(query_file, metadata)
|
task = Task(query_file, metadata)
|
||||||
task._dry_run()
|
task._dry_run() @pytest.mark.integration
|
||||||
|
def test_task_get_dependencies_none(self, tmp_path):
|
||||||
|
client = bigquery.Client(os.environ["GOOGLE_PROJECT_ID"])
|
||||||
|
|
||||||
|
query_file_path = tmp_path / "sql" / "test" / "query_v1"
|
||||||
|
os.makedirs(query_file_path)
|
||||||
|
|
||||||
|
query_file = query_file_path / "query.sql"
|
||||||
|
query_file.write_text("SELECT 123423")
|
||||||
|
|
||||||
|
metadata = Metadata(
|
||||||
|
"test",
|
||||||
|
"test",
|
||||||
|
{},
|
||||||
|
{"dag_name": "test_dag", "depends_on_past": True, "param": "test_param"},
|
||||||
|
)
|
||||||
|
|
||||||
|
task = Task(query_file, metadata)
|
||||||
|
dags = DagCollection.from_dict({})
|
||||||
|
assert task.get_dependencies(client, dags) == []
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_task_get_multiple_dependencies(self, tmp_path):
|
||||||
|
project_id = os.environ["GOOGLE_PROJECT_ID"]
|
||||||
|
client = bigquery.Client(os.environ["GOOGLE_PROJECT_ID"])
|
||||||
|
|
||||||
|
query_file_path = tmp_path / "sql" / "test" / "query_v1"
|
||||||
|
os.makedirs(query_file_path)
|
||||||
|
|
||||||
|
query_file = query_file_path / "query.sql"
|
||||||
|
query_file.write_text(
|
||||||
|
f"SELECT * FROM {project_id}.test.table1_v1 "
|
||||||
|
+ f"UNION ALL SELECT * FROM {project_id}.test.table2_v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
schema = [bigquery.SchemaField("a", "STRING", mode="NULLABLE")]
|
||||||
|
|
||||||
|
table = bigquery.Table(f"{project_id}.test.table1_v1", schema=schema)
|
||||||
|
client.create_table(table)
|
||||||
|
table = bigquery.Table(f"{project_id}.test.table2_v1", schema=schema)
|
||||||
|
client.create_table(table)
|
||||||
|
|
||||||
|
metadata = Metadata(
|
||||||
|
"test",
|
||||||
|
"test",
|
||||||
|
{},
|
||||||
|
{"dag_name": "test_dag", "depends_on_past": True, "param": "test_param"},
|
||||||
|
)
|
||||||
|
|
||||||
|
task = Task(query_file, metadata)
|
||||||
|
|
||||||
|
table1_task = Task(
|
||||||
|
tmp_path / "sql" / "test" / "table1_v1" / "query.sql", metadata
|
||||||
|
)
|
||||||
|
table2_task = Task(
|
||||||
|
tmp_path / "sql" / "test" / "table2_v1" / "query.sql", metadata
|
||||||
|
)
|
||||||
|
|
||||||
|
dags = DagCollection.from_dict(
|
||||||
|
{"test_dag": {"schedule_interval": "daily", "default_args": {}}}
|
||||||
|
).with_tasks([task, table1_task, table2_task])
|
||||||
|
|
||||||
|
result = task.get_dependencies(client, dags)
|
||||||
|
|
||||||
|
client.delete_table(f"{project_id}.test.table1_v1")
|
||||||
|
client.delete_table(f"{project_id}.test.table2_v1")
|
||||||
|
|
||||||
|
tables = [f"{t.dataset}__{t.table}__{t.version}" for t in result]
|
||||||
|
|
||||||
|
assert "test__table1__v1" in tables
|
||||||
|
assert "test__table2__v1" in tables
|
||||||
|
|
||||||
|
|
||||||
|
# todo: test queries with views
|
||||||
|
|
Загрузка…
Ссылка в новой задаче