Fix DAG generation for different projects

This commit is contained in:
Anna Scholtz 2020-10-13 10:52:23 -07:00
Родитель d809e6fa2d
Коммит 8d1683c628
4 изменённых файлов: 76 добавлений и 52 удалений

Просмотреть файл

@ -48,56 +48,57 @@ parser.add_argument(
standard_args.add_log_level(parser)
def get_dags(sql_dir, dags_config):
def get_dags(project_id, dags_config):
"""Return all configured DAGs including associated tasks."""
tasks = []
dag_collection = DagCollection.from_file(dags_config)
# parse metadata.yaml to retrieve scheduling information
if os.path.isdir(sql_dir):
for root, dirs, files in os.walk(sql_dir):
try:
if QUERY_FILE in files:
query_file = os.path.join(root, QUERY_FILE)
task = Task.of_query(query_file, dag_collection=dag_collection)
elif QUERY_PART_FILE in files:
# multipart query
query_file = os.path.join(root, QUERY_PART_FILE)
task = Task.of_multipart_query(
query_file, dag_collection=dag_collection
)
elif SCRIPT_FILE in files:
query_file = os.path.join(root, SCRIPT_FILE)
task = Task.of_script(query_file, dag_collection=dag_collection)
for project_dir in project_dirs(project_id):
# parse metadata.yaml to retrieve scheduling information
if os.path.isdir(project_dir):
for root, dirs, files in os.walk(project_dir):
try:
if QUERY_FILE in files:
query_file = os.path.join(root, QUERY_FILE)
task = Task.of_query(query_file, dag_collection=dag_collection)
elif QUERY_PART_FILE in files:
# multipart query
query_file = os.path.join(root, QUERY_PART_FILE)
task = Task.of_multipart_query(
query_file, dag_collection=dag_collection
)
elif SCRIPT_FILE in files:
query_file = os.path.join(root, SCRIPT_FILE)
task = Task.of_script(query_file, dag_collection=dag_collection)
else:
continue
except FileNotFoundError:
# query has no metadata.yaml file; skip
pass
except UnscheduledTask:
# logging.debug(
# f"No scheduling information for {query_file}."
# )
#
# most tasks lack scheduling information for now
pass
except Exception as e:
# in the case that there was some other error, report the query
# that failed before exiting
logging.error(f"Error processing task for query {query_file}")
raise e
else:
continue
except FileNotFoundError:
# query has no metadata.yaml file; skip
pass
except UnscheduledTask:
# logging.debug(
# f"No scheduling information for {query_file}."
# )
#
# most tasks lack scheduling information for now
pass
except Exception as e:
# in the case that there was some other error, report the query
# that failed before exiting
logging.error(f"Error processing task for query {query_file}")
raise e
else:
tasks.append(task)
tasks.append(task)
else:
logging.error(
"""
Invalid sql_dir: {}, sql_dir must be a directory with
structure <sql_dir>/<project>/<dataset>/<table>/metadata.yaml.
""".format(
sql_dir
else:
logging.error(
"""
Invalid project_dir: {}, project_dir must be a directory with
structure <sql>/<project>/<dataset>/<table>/metadata.yaml.
""".format(
project_dir
)
)
)
return dag_collection.with_tasks(tasks)
@ -107,13 +108,8 @@ def main():
args = parser.parse_args()
dags_output_dir = Path(args.output_dir)
projects = project_dirs(args.project_id)
for project in projects:
# todo: add support for multiple projects to publish tasks to the same DAG
if "moz-fx-data-shared-prod" in project:
dags = get_dags(project, args.dags_config)
dags.to_airflow_dags(dags_output_dir, args.dag_id)
dags = get_dags(args.project_id, args.dags_config)
dags.to_airflow_dags(dags_output_dir, args.dag_id)
if __name__ == "__main__":

Просмотреть файл

@ -0,0 +1,14 @@
friendly_name: "Test table for an incremental query"
description: "Test table for an incremental query"
owners:
- ascholtz@mozilla.com
labels:
schedule: daily
public_json: true
incremental: true
incremental_export: true
review_bug: 123456
scheduling:
dag_name: "bqetl_events"
depends_on_past: false
arguments: ["--append_table"]

Просмотреть файл

@ -0,0 +1,14 @@
SELECT
DATE '2020-03-15' AS d,
"val1" AS a,
2 AS b
UNION ALL
SELECT
DATE '2020-03-15' AS d,
"val2" AS a,
34 AS b
UNION ALL
SELECT
@submission_date AS d,
"val3" AS a,
@a AS b

Просмотреть файл

@ -6,7 +6,7 @@ TEST_DIR = Path(__file__).parent.parent
class TestGenerateAirflowDags(object):
sql_dir = TEST_DIR / "data" / "test_sql" / "moz-fx-data-test-project"
sql_dir = TEST_DIR / "data" / "test_sql"
dags_config = TEST_DIR / "data" / "dags.yaml"
def test_get_dags(self):
@ -18,7 +18,7 @@ class TestGenerateAirflowDags(object):
assert dags.dag_by_name("bqetl_core") is not None
events_dag = dags.dag_by_name("bqetl_events")
assert len(events_dag.tasks) == 2
assert len(events_dag.tasks) == 3
assert events_dag.tasks[0].depends_on_past is False
assert events_dag.tasks[1].depends_on_past is False