DENG 946 - Update DAG generation to include ETL checks (#3969)

* CAccomodate dq checks in dag generation

* Modify the tests to include dq check

* Generate dags to include bigquery_dq_check

* rename destination to source for dq check

* Add DQ check to download attribution dag

* Update bigquery_etl/query_scheduling/templates/airflow_dag.j2

Co-authored-by: Anna Scholtz <anna@scholtzan.net>

* Update bigquery_etl/query_scheduling/generate_airflow_dags.py

Co-authored-by: Anna Scholtz <anna@scholtzan.net>

* Set upstream check dependencies using upstream_dependencies

* Change bigquery_dq_check as per gcp.py utils

* remove sql_file_path in airflow jinja

* Fix download attribution dag

---------

Co-authored-by: Anna Scholtz <anna@scholtzan.net>
This commit is contained in:
Alekhya 2023-06-28 13:50:41 -04:00 коммит произвёл GitHub
Родитель 6e42377cb8
Коммит 01333782b1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
71 изменённых файлов: 160 добавлений и 73 удалений

Просмотреть файл

@ -1,12 +1,13 @@
"""Generates Airflow DAGs for scheduled queries."""
import copy
import logging
import os
from argparse import ArgumentParser
from pathlib import Path
from bigquery_etl.query_scheduling.dag_collection import DagCollection
from bigquery_etl.query_scheduling.task import Task, UnscheduledTask
from bigquery_etl.query_scheduling.task import Task, TaskRef, UnscheduledTask
from bigquery_etl.util import standard_args
from bigquery_etl.util.common import project_dirs
@ -17,6 +18,7 @@ SCRIPT_FILE = "script.sql"
PYTHON_SCRIPT_FILE = "query.py"
DEFAULT_DAGS_DIR = "dags"
TELEMETRY_AIRFLOW_GITHUB = "https://github.com/mozilla/telemetry-airflow.git"
CHECKS_FILE = "checks.sql"
parser = ArgumentParser(description=__doc__)
parser.add_argument(
@ -52,7 +54,6 @@ def get_dags(project_id, dags_config):
"""Return all configured DAGs including associated tasks."""
tasks = []
dag_collection = DagCollection.from_file(dags_config)
for project_dir in project_dirs(project_id):
# parse metadata.yaml to retrieve scheduling information
if os.path.isdir(project_dir):
@ -93,6 +94,17 @@ def get_dags(project_id, dags_config):
logging.error(f"Error processing task for query {query_file}")
raise e
else:
if CHECKS_FILE in files:
checks_file = os.path.join(root, CHECKS_FILE)
checks_task = copy.deepcopy(
Task.of_dq_check(checks_file, dag_collection=dag_collection)
)
tasks.append(checks_task)
task_ref = TaskRef(
dag_name=task.dag_name,
task_id=task.task_name,
)
checks_task.upstream_dependencies.append(task_ref)
tasks.append(task)
else:
@ -104,7 +116,6 @@ def get_dags(project_id, dags_config):
project_dir
)
)
return dag_collection.with_tasks(tasks)

Просмотреть файл

@ -27,7 +27,11 @@ from bigquery_etl.query_scheduling.utils import (
AIRFLOW_TASK_TEMPLATE = "airflow_task.j2"
QUERY_FILE_RE = re.compile(
r"^(?:.*/)?([a-zA-Z0-9_-]+)/([a-zA-Z0-9_]+)/"
r"([a-zA-Z0-9_]+)_(v[0-9]+)/(?:query\.sql|part1\.sql|script\.sql|query\.py)$"
r"([a-zA-Z0-9_]+)_(v[0-9]+)/(?:query\.sql|part1\.sql|script\.sql|query\.py|checks\.sql)$"
)
CHECKS_FILE_RE = re.compile(
r"^(?:.*/)?([a-zA-Z0-9_-]+)/([a-zA-Z0-9_]+)/"
r"([a-zA-Z0-9_]+)_(v[0-9]+)/(?:checks\.sql)$"
)
DEFAULT_DESTINATION_TABLE_STR = "use-default-destination-table"
MAX_TASK_NAME_LENGTH = 250
@ -215,6 +219,7 @@ class Task:
referenced_tables: Optional[List[Tuple[str, str, str]]] = attr.ib(None)
destination_table: Optional[str] = attr.ib(default=DEFAULT_DESTINATION_TABLE_STR)
is_python_script: bool = attr.ib(False)
is_dq_check: bool = attr.ib(False)
task_concurrency: Optional[int] = attr.ib(None)
retry_delay: Optional[str] = attr.ib(None)
retries: Optional[int] = attr.ib(None)
@ -293,6 +298,7 @@ class Task:
def __attrs_post_init__(self):
"""Extract information from the query file name."""
query_file_re = re.search(QUERY_FILE_RE, self.query_file)
check_file_re = re.search(CHECKS_FILE_RE, self.query_file)
if query_file_re:
self.project = query_file_re.group(1)
self.dataset = query_file_re.group(2)
@ -306,6 +312,14 @@ class Task:
]
self.validate_task_name(None, self.task_name)
if check_file_re is not None:
self.task_name = (
f"checks__{self.dataset}__{self.table}__{self.version}"[
-MAX_TASK_NAME_LENGTH:
]
)
self.validate_task_name(None, self.task_name)
if self.destination_table == DEFAULT_DESTINATION_TABLE_STR:
self.destination_table = f"{self.table}_{self.version}"
@ -451,6 +465,15 @@ class Task:
task.is_python_script = True
return task
@classmethod
def of_dq_check(cls, query_file, metadata=None, dag_collection=None):
"""Create a task that schedules DQ check file in Airflow."""
task = cls.of_query(query_file, metadata, dag_collection)
task.query_file_path = query_file
task.is_dq_check = True
task.depends_on_fivetran = []
return task
def to_ref(self, dag_collection):
"""Return the task as `TaskRef`."""
return TaskRef(

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
{% set ns = namespace(uses_fivetran=False) -%}
{% for task in tasks -%}
@ -56,6 +56,45 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
docker_image='gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest',
owner='{{ task.owner }}',
email={{ task.email | sort }},
{% elif task.is_dq_check -%}
{{ task.task_name }} = bigquery_dq_check(
task_id='{{ task.task_name }}',
{#+ TODO when Airflow is updated to 2.2+ use ds_nodash instead of ds_format -#}
source_table={%+ if task.date_partition_offset -%}'{{ task.destination_table }}${% raw %}{{{% endraw %} macros.ds_format(macros.ds_add(ds, {{ task.date_partition_offset }}), "%Y-%m-%d", "%Y%m%d") {% raw %}}}{% endraw %}'
{%+ elif task.destination_table -%}'{{ task.destination_table }}'
{%+ else -%}None
{%+ endif -%},
{%+ if task.query_project -%}
dataset_id='{{ task.project }}:{{ task.dataset }}',
project_id='{{ task.query_project }}',
{%+ else -%}
dataset_id='{{ task.dataset }}',
project_id='{{ task.project }}',
{%+ endif -%}
owner='{{ task.owner }}',
{%+ if task.email | length > 0 -%}
email={{ task.email | sort }},
{%+ endif -%}
{%+ if task.start_date -%}
start_date={{ task.start_date | format_date | format_repr }},
{%+ endif -%}
depends_on_past={{ task.depends_on_past }},
{%+ if (
task.destination_table
and not task.date_partition_parameter
and not '$' in task.destination_table
and not task.depends_on_past
and not task.task_concurrency
) -%}
{#+ Avoid having concurrent tasks for ETLs that target the whole table. -#}
task_concurrency=1,
{%+ endif -%}
{%+ if task.arguments | length > 0 -%}
arguments={{ task.arguments }},
{%+ endif -%}
{%+ if task.parameters | length > 0 -%}
parameters={{ task.parameters }},
{%+ endif -%}
{%+ else -%}
{{ task.task_name }} = bigquery_etl_query(
task_id='{{ task.task_name }}',

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_acoustic_contact_export

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_acoustic_raw_recipient_export

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_activity_stream

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_addons

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_adjust

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_amo_stats

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_analytics_aggregations

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_analytics_tables

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_campaign_cost_breakdowns

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_core

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_ctxsvc_derived

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_deletion_request_volume

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_desktop_funnel

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_desktop_mobile_search_monthly

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_desktop_platform

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_devtools

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_domain_meta

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_download_funnel_attribution
@ -43,6 +43,16 @@ with DAG(
doc_md=docs,
tags=tags,
) as dag:
checks__ga_derived__downloads_with_attribution__v2 = bigquery_dq_check(
task_id="checks__ga_derived__downloads_with_attribution__v2",
source_table='downloads_with_attribution_v2${{ macros.ds_format(macros.ds_add(ds, -1), "%Y-%m-%d", "%Y%m%d") }}',
dataset_id="ga_derived",
project_id="moz-fx-data-marketing-prod",
owner="gleonard@mozilla.com",
email=["gleonard@mozilla.com", "telemetry-alerts@mozilla.com"],
depends_on_past=False,
)
ga_derived__downloads_with_attribution__v2 = bigquery_etl_query(
task_id="ga_derived__downloads_with_attribution__v2",
destination_table='downloads_with_attribution_v2${{ macros.ds_format(macros.ds_add(ds, -1), "%Y-%m-%d", "%Y%m%d") }}',
@ -55,6 +65,10 @@ with DAG(
parameters=["download_date:DATE:{{macros.ds_add(ds, -1)}}"],
)
checks__ga_derived__downloads_with_attribution__v2.set_upstream(
ga_derived__downloads_with_attribution__v2
)
wait_for_ga_derived__www_site_empty_check__v1 = ExternalTaskSensor(
task_id="wait_for_ga_derived__www_site_empty_check__v1",
external_dag_id="bqetl_google_analytics_derived",

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_error_aggregates

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_event_rollup

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_experimenter_experiments_import

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_experiments_daily

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_feature_usage

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_fenix_event_rollup

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_fenix_external

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_firefox_ios

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_fivetran_apple_ads

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_fivetran_google_ads

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_fog_decision_support

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_fxa_events

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_google_analytics_derived

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_gud

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_internal_tooling

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_internet_outages

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_iprospect

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_main_summary

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_mdn_yari

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_messaging_system

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_mobile_activation

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_mobile_search

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_monitoring

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_mozilla_vpn_site_metrics

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_newtab

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_nondesktop

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_org_mozilla_fenix_derived

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_org_mozilla_firefox_derived

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_org_mozilla_focus_derived

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_pocket

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_regrets_reporter_summary

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_release_criteria

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_releases

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_search

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_search_dashboard

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_search_terms_daily

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_sponsored_tiles_clients_daily

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_ssl_ratios

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_status_check

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_unified

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_urlbar

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_test_dag

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_test_dag

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_external_test_dag

Просмотреть файл

@ -6,7 +6,7 @@ from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.gcp import bigquery_etl_query, gke_command
from utils.gcp import bigquery_etl_query, gke_command, bigquery_dq_check
docs = """
### bqetl_test_dag