Generate ExternalTaskMarkers for Airflow downstream dependencies

This commit is contained in:
Anna Scholtz 2022-06-14 13:50:53 -07:00
Родитель 0cad987ad9
Коммит 2f5c6ac41a
70 изменённых файлов: 986 добавлений и 304 удалений

Просмотреть файл

@ -208,28 +208,41 @@ class Dag:
return env
def to_airflow_dag(self, dag_collection):
def to_airflow_dag(self):
"""Convert the DAG to its Airflow representation and return the python code."""
env = self._jinja_env()
dag_template = env.get_template(AIRFLOW_DAG_TEMPLATE)
args = self.__dict__
if len(args["tasks"]) == 0:
return dag_template.render(args)
def with_upstream_dependencies(self, dag_collection):
"""Determine upstream dependencies of DAG tasks."""
if len(self.tasks) == 0:
raise InvalidDag(
f"DAG {self.name} has no tasks - cannot convert it to a valid .py DAG "
f"file. Does it appear under `scheduling` in any metadata.yaml files?"
)
for task in args["tasks"]:
task.with_dependencies(dag_collection)
for task in self.tasks:
task.with_upstream_dependencies(dag_collection)
return dag_template.render(args)
def with_downstream_dependencies(self, dag_collection):
"""Determine downstream dependencies."""
if len(self.tasks) == 0:
raise InvalidDag(
f"DAG {self.name} has no tasks - cannot convert it to a valid .py DAG "
f"file. Does it appear under `scheduling` in any metadata.yaml files?"
)
for task in self.tasks:
task.with_downstream_dependencies(dag_collection)
class PublicDataJsonDag(Dag):
"""Special DAG with tasks exporting public json data to GCS."""
def to_airflow_dag(self, dag_collection):
def to_airflow_dag(self):
"""Convert the DAG to its Airflow representation and return the python code."""
env = self._jinja_env()
dag_template = env.get_template(PUBLIC_DATA_JSON_DAG_TEMPLATE)

Просмотреть файл

@ -20,6 +20,25 @@ class DagCollection:
self.dags = dags
self.dags_by_name = {dag.name: dag for dag in dags}
@property
def downstream_tasks(self):
"""Return collection to quickly lookup downstream tasks."""
if downstream_tasks := getattr(self, "_downstream_tasks", None):
return downstream_tasks
self._downstream_tasks = {}
for dag in self.dags:
for task in dag.tasks:
for upstream_dependency in task.depends_on + task.upstream_dependencies:
if upstream_dependency.dag_name != task.dag_name:
if upstream_dependency in self._downstream_tasks:
self._downstream_tasks[upstream_dependency].append(task)
else:
self._downstream_tasks[upstream_dependency] = [task]
return self._downstream_tasks
@classmethod
def from_dict(cls, d):
"""
@ -104,10 +123,18 @@ class DagCollection:
"""Generate the Airflow DAG representation for the provided DAG."""
output_file = Path(output_dir) / (dag.name + ".py")
formatted_dag = format_file_contents(
dag.to_airflow_dag(self), fast=False, mode=FileMode()
dag.to_airflow_dag(), fast=False, mode=FileMode()
)
output_file.write_text(formatted_dag)
def _get_upstream_dependencies(self, dag):
dag.with_upstream_dependencies(self)
return dag
def _get_downstream_dependencies(self, dag):
dag.with_downstream_dependencies(self)
return dag
def to_airflow_dags(self, output_dir, dag_to_generate=None):
"""Write DAG representation as Airflow dags to file."""
# https://pythonspeed.com/articles/python-multiprocessing/
@ -117,6 +144,13 @@ class DagCollection:
set_start_method("spawn")
except Exception:
pass
with get_context("spawn").Pool(8) as p:
self.dags = p.map(self._get_upstream_dependencies, self.dags)
with get_context("spawn").Pool(8) as p:
self.dags = p.map(self._get_downstream_dependencies, self.dags)
to_airflow_dag = partial(self.dag_to_airflow, output_dir)
if dag_to_generate is None:

Просмотреть файл

@ -54,6 +54,12 @@ def format_timedelta(timdelta_string):
return timedelta(**time_params)
def format_timedelta_macro(timedelta_string):
"""Format an Airflow timedelta macro."""
timedelta = repr(format_timedelta(timedelta_string))
return timedelta.replace("datetime", "macros")
def format_optional_string(val):
"""Format a value that is either None or a string."""
if val is None:

Просмотреть файл

@ -162,8 +162,12 @@ class Task:
date_partition_parameter: Optional[str] = "submission_date"
# indicate whether data should be published as JSON
public_json: bool = attr.ib(False)
# upstream dependencies
depends_on: List[TaskRef] = attr.ib([])
depends_on_fivetran: List[FivetranTask] = attr.ib([])
external_downstream_tasks: List[TaskRef] = attr.ib([])
upstream_dependencies: List[TaskRef] = attr.ib([])
downstream_dependencies: List[TaskRef] = attr.ib([])
arguments: List[str] = attr.ib([])
parameters: List[str] = attr.ib([])
multipart: bool = attr.ib(False)
@ -351,6 +355,16 @@ class Task:
task.is_python_script = True
return task
def to_ref(self, dag_collection):
"""Return the task as `TaskRef`."""
return TaskRef(
dag_name=self.dag_name,
task_id=self.task_name,
schedule_interval=dag_collection.dag_by_name(
self.dag_name
).schedule_interval,
)
def _get_referenced_tables(self):
"""Use zetasql to get tables the query depends on."""
logging.info(f"Get dependencies for {self.task_name}")
@ -377,7 +391,7 @@ class Task:
self.referenced_tables = sorted(table_names)
return self.referenced_tables
def with_dependencies(self, dag_collection):
def with_upstream_dependencies(self, dag_collection):
"""Perfom a dry_run to get upstream dependencies."""
dependencies = []
@ -391,13 +405,7 @@ class Task:
upstream_task = dag_collection.task_for_table(table[0], table[1], table[2])
if upstream_task is not None:
task = TaskRef(
dag_name=upstream_task.dag_name,
task_id=upstream_task.task_name,
schedule_interval=dag_collection.dag_by_name(
upstream_task.dag_name
).schedule_interval,
)
task = upstream_task.to_ref(dag_collection)
if not _duplicate_dependency(task):
dependencies.append(task)
else:
@ -408,4 +416,15 @@ class Task:
dependencies.append(task)
break # stop after the first match
self.dependencies = dependencies
self.upstream_dependencies = dependencies
def with_downstream_dependencies(self, dag_collection):
"""Get downstream tasks by looking up upstream dependencies in DAG collection."""
task_ref = self.to_ref(dag_collection)
external_downstream_tasks = dag_collection.downstream_tasks.get(task_ref, [])
downstream_dependencies = [
task.to_ref(dag_collection)
for task in external_downstream_tasks
if task.dag_name != self.dag_name
]
self.downstream_dependencies = downstream_dependencies

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -119,19 +121,35 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
{% if not task.is_python_script -%}
{% endif -%}
)
{% if (task.downstream_dependencies + task.external_downstream_tasks)|length > 0 -%}
with TaskGroup('{{ task.task_name }}_external') as {{ task.task_name }}_external:
{% for downstream_task in task.downstream_dependencies + task.external_downstream_tasks -%}
ExternalTaskMarker(
task_id='{{ downstream_task.dag_name }}__wait_for_{{ downstream_task.task_id }}',
external_dag_id='{{ downstream_task.dag_name }}',
external_task_id='wait_for_{{ downstream_task.task_id }}',
{% if downstream_task.get_execution_delta(schedule_interval) -%}
execution_date="{% raw %}{{{% endraw %} (execution_date + {{ downstream_task.get_execution_delta(schedule_interval) | format_timedelta_macro }}).isoformat() {% raw %}}}{% endraw %}",
{% endif -%}
)
{% endfor -%}
{{ task.task_name }}_external.set_upstream({{ task.task_name }})
{% endif -%}
{% endfor -%}
{% set wait_for_seen = [] -%}
{% set fivetran_seen = [] -%}
{% for task in tasks | sort(attribute='task_name') %}
{% for dependency in (task.dependencies + task.depends_on) | sort(attribute='task_id') -%}
{% for dependency in (task.upstream_dependencies + task.depends_on) | sort(attribute='task_id') -%}
{% if dependency.dag_name == name and dependency not in task.depends_on -%}
{% if dependency.task_id != task.task_name %}
{{ task.task_name }}.set_upstream({{ dependency.task_id }})
{% endif -%}
{% else -%}
{% if (dependency.dag_name, dependency.task_id) not in wait_for_seen -%}
wait_for_{{ dependency.task_id }} = ExternalTaskCompletedSensor(
wait_for_{{ dependency.task_id }} = ExternalTaskSensor(
task_id='wait_for_{{ dependency.task_id }}',
external_dag_id='{{ dependency.dag_name }}',
external_task_id='{{ dependency.task_id }}',

Просмотреть файл

@ -4,7 +4,7 @@ from airflow import DAG
from airflow.utils.state import State
import datetime
from operators.gcp_container_operator import GKEPodOperator
from operators.task_sensor import ExternalTaskCompletedSensor
from operators.task_sensor import ExternalTaskSensor
from utils.gcp import gke_command
docs = """
@ -52,11 +52,11 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
{% endfor -%}
{% for task in tasks | sort(attribute='task_name') %}
{% for dependency in task.dependencies | sort(attribute='task_id') -%}
{% for dependency in task.upstream_dependencies | sort(attribute='task_id') -%}
{% if dependency.dag_name == name -%}
{{ task.task_name }}.set_upstream({{ dependency.task_id }})
{% else -%}
wait_for_{{ dependency.task_id }} = ExternalTaskCompletedSensor(
wait_for_{{ dependency.task_id }} = ExternalTaskSensor(
task_id='wait_for_{{ dependency.task_id }}',
external_dag_id='{{ dependency.dag_name }}',
external_task_id='{{ dependency.task_id }}',
@ -72,6 +72,22 @@ with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None
{{ task.task_name }}.set_upstream(wait_for_{{ dependency.task_id }})
{% endif -%}
{% endfor -%}
{% if (task.downstream_dependencies + task.external_downstream_tasks)|length > 0 -%}
with TaskGroup('{{ task.task_name }}_external') as {{ task.task_name }}_external:
{% for downstream_task in task.downstream_dependencies + task.external_downstream_tasks -%}
ExternalTaskMarker(
task_id='{{ downstream_task.dag_name }}__wait_for_{{ downstream_task.task_id }}',
external_dag_id='{{ downstream_task.dag_name }}',
external_task_id='wait_for_{{ downstream_task.task_id }}',
{% if downstream_task.get_execution_delta(schedule_interval) -%}
execution_date="{% raw %}{{{% endraw %} (execution_date + {{ downstream_task.get_execution_delta(schedule_interval) | format_timedelta_macro }}).isoformat() {% raw %}}}{% endraw %}",
{% endif -%}
)
{% endfor -%}
{{ task.task_name }}_external.set_upstream({{ task.task_name }})
{% endif -%}
{% endfor %}
public_data_gcs_metadata = gke_command(

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -81,7 +83,7 @@ with DAG(
acoustic__contact_current_snapshot__v1.set_upstream(acoustic__contact__v1)
wait_for_fivetran_load_completed = ExternalTaskCompletedSensor(
wait_for_fivetran_load_completed = ExternalTaskSensor(
task_id="wait_for_fivetran_load_completed",
external_dag_id="fivetran_acoustic_contact_export",
external_task_id="fivetran_load_completed",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -68,7 +70,7 @@ with DAG(
acoustic__raw_recipient__v1.set_upstream(acoustic__raw_recipient_raw__v1)
wait_for_fivetran_load_completed = ExternalTaskCompletedSensor(
wait_for_fivetran_load_completed = ExternalTaskSensor(
task_id="wait_for_fivetran_load_completed",
external_dag_id="fivetran_acoustic_raw_recipient_export",
external_task_id="fivetran_load_completed",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -70,7 +72,7 @@ with DAG(
activity_stream_bi__impression_stats_flat__v1
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -78,6 +80,19 @@ with DAG(
depends_on_past=False,
)
with TaskGroup(
"telemetry_derived__addons__v2_external"
) as telemetry_derived__addons__v2_external:
ExternalTaskMarker(
task_id="bqetl_feature_usage__wait_for_telemetry_derived__feature_usage__v2",
external_dag_id="bqetl_feature_usage",
external_task_id="wait_for_telemetry_derived__feature_usage__v2",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
telemetry_derived__addons__v2_external.set_upstream(
telemetry_derived__addons__v2
)
telemetry_derived__addons_daily__v1 = bigquery_etl_query(
task_id="telemetry_derived__addons_daily__v1",
destination_table="addons_daily_v1",
@ -89,7 +104,7 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_main_ping = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",
@ -107,7 +122,7 @@ with DAG(
telemetry_derived__addons__v2.set_upstream(wait_for_copy_deduplicate_main_ping)
wait_for_search_derived__search_clients_daily__v8 = ExternalTaskCompletedSensor(
wait_for_search_derived__search_clients_daily__v8 = ExternalTaskSensor(
task_id="wait_for_search_derived__search_clients_daily__v8",
external_dag_id="bqetl_search",
external_task_id="search_derived__search_clients_daily__v8",
@ -120,7 +135,7 @@ with DAG(
telemetry_derived__addons_daily__v1.set_upstream(
wait_for_search_derived__search_clients_daily__v8
)
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_last_seen__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_last_seen__v1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -121,7 +123,7 @@ with DAG(
amo_prod__amo_stats_dau__v2.set_upstream(amo_prod__fenix_addons_by_client__v1)
wait_for_bq_main_events = ExternalTaskCompletedSensor(
wait_for_bq_main_events = ExternalTaskSensor(
task_id="wait_for_bq_main_events",
external_dag_id="copy_deduplicate",
external_task_id="bq_main_events",
@ -132,7 +134,7 @@ with DAG(
)
amo_prod__amo_stats_installs__v3.set_upstream(wait_for_bq_main_events)
wait_for_event_events = ExternalTaskCompletedSensor(
wait_for_event_events = ExternalTaskSensor(
task_id="wait_for_event_events",
external_dag_id="copy_deduplicate",
external_task_id="event_events",
@ -144,7 +146,7 @@ with DAG(
amo_prod__amo_stats_installs__v3.set_upstream(wait_for_event_events)
wait_for_copy_deduplicate_main_ping = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",
@ -158,7 +160,7 @@ with DAG(
wait_for_copy_deduplicate_main_ping
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -106,7 +108,7 @@ with DAG(
depends_on_past=False,
)
wait_for_telemetry_derived__unified_metrics__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__unified_metrics__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__unified_metrics__v1",
external_dag_id="bqetl_unified",
external_task_id="telemetry_derived__unified_metrics__v1",
@ -128,7 +130,7 @@ with DAG(
wait_for_telemetry_derived__unified_metrics__v1
)
wait_for_telemetry_derived__rolling_cohorts__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__rolling_cohorts__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__rolling_cohorts__v1",
external_dag_id="bqetl_unified",
external_task_id="telemetry_derived__rolling_cohorts__v1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -65,7 +67,44 @@ with DAG(
priority_weight=70,
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
with TaskGroup(
"telemetry_derived__core_clients_last_seen__v1_external"
) as telemetry_derived__core_clients_last_seen__v1_external:
ExternalTaskMarker(
task_id="bqetl_nondesktop__wait_for_telemetry_derived__firefox_nondesktop_day_2_7_activation__v1",
external_dag_id="bqetl_nondesktop",
external_task_id="wait_for_telemetry_derived__firefox_nondesktop_day_2_7_activation__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_nondesktop__wait_for_telemetry_derived__firefox_nondesktop_exact_mau28__v1",
external_dag_id="bqetl_nondesktop",
external_task_id="wait_for_telemetry_derived__firefox_nondesktop_exact_mau28__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_nondesktop__wait_for_firefox_nondesktop_exact_mau28_by_client_count_dimensions",
external_dag_id="bqetl_nondesktop",
external_task_id="wait_for_firefox_nondesktop_exact_mau28_by_client_count_dimensions",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_gud__wait_for_telemetry_derived__smoot_usage_nondesktop__v2",
external_dag_id="bqetl_gud",
external_task_id="wait_for_telemetry_derived__smoot_usage_nondesktop__v2",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_unified__wait_for_telemetry_derived__unified_metrics__v1",
external_dag_id="bqetl_unified",
external_task_id="wait_for_telemetry_derived__unified_metrics__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
telemetry_derived__core_clients_last_seen__v1_external.set_upstream(
telemetry_derived__core_clients_last_seen__v1
)
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
@ -78,16 +117,14 @@ with DAG(
telemetry_derived__core_clients_daily__v1.set_upstream(
wait_for_copy_deduplicate_all
)
wait_for_telemetry_derived__core_clients_first_seen__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_telemetry_derived__core_clients_first_seen__v1",
external_dag_id="copy_deduplicate",
external_task_id="telemetry_derived__core_clients_first_seen__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_telemetry_derived__core_clients_first_seen__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__core_clients_first_seen__v1",
external_dag_id="copy_deduplicate",
external_task_id="telemetry_derived__core_clients_first_seen__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
telemetry_derived__core_clients_daily__v1.set_upstream(

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -53,7 +55,7 @@ with DAG(
arguments=["--schema_update_option=ALLOW_FIELD_ADDITION"],
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -76,7 +78,7 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
@ -89,7 +91,7 @@ with DAG(
telemetry_derived__desktop_funnel_activation_day_6__v1.set_upstream(
wait_for_copy_deduplicate_all
)
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_last_seen__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_last_seen__v1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -58,7 +60,7 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_main_ping = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -71,7 +73,7 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_main_ping = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",
@ -85,7 +87,7 @@ with DAG(
wait_for_copy_deduplicate_main_ping
)
wait_for_telemetry_derived__clients_daily_joined__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_daily_joined__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_daily_joined__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_daily_joined__v1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -198,31 +200,27 @@ with DAG(
firefox_accounts_derived__event_types__v1
)
wait_for_firefox_accounts_derived__fxa_auth_events__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_firefox_accounts_derived__fxa_auth_events__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_auth_events__v1",
execution_delta=datetime.timedelta(seconds=5400),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_firefox_accounts_derived__fxa_auth_events__v1 = ExternalTaskSensor(
task_id="wait_for_firefox_accounts_derived__fxa_auth_events__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_auth_events__v1",
execution_delta=datetime.timedelta(seconds=5400),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
funnel_events_source__v1.set_upstream(
wait_for_firefox_accounts_derived__fxa_auth_events__v1
)
wait_for_firefox_accounts_derived__fxa_content_events__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_firefox_accounts_derived__fxa_content_events__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_content_events__v1",
execution_delta=datetime.timedelta(seconds=5400),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_firefox_accounts_derived__fxa_content_events__v1 = ExternalTaskSensor(
task_id="wait_for_firefox_accounts_derived__fxa_content_events__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_content_events__v1",
execution_delta=datetime.timedelta(seconds=5400),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
funnel_events_source__v1.set_upstream(
@ -233,7 +231,7 @@ with DAG(
messaging_system_derived__event_types_history__v1
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -103,7 +105,7 @@ with DAG(
telemetry_derived__experiments_daily_active_clients__v1
)
wait_for_bq_main_events = ExternalTaskCompletedSensor(
wait_for_bq_main_events = ExternalTaskSensor(
task_id="wait_for_bq_main_events",
external_dag_id="copy_deduplicate",
external_task_id="bq_main_events",
@ -116,7 +118,7 @@ with DAG(
telemetry_derived__experiment_enrollment_aggregates__v1.set_upstream(
wait_for_bq_main_events
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
@ -129,7 +131,7 @@ with DAG(
telemetry_derived__experiment_enrollment_aggregates__v1.set_upstream(
wait_for_copy_deduplicate_all
)
wait_for_event_events = ExternalTaskCompletedSensor(
wait_for_event_events = ExternalTaskSensor(
task_id="wait_for_event_events",
external_dag_id="copy_deduplicate",
external_task_id="event_events",
@ -146,7 +148,7 @@ with DAG(
telemetry_derived__experiment_search_aggregates__v1.set_upstream(
wait_for_copy_deduplicate_all
)
wait_for_copy_deduplicate_main_ping = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",
@ -163,7 +165,7 @@ with DAG(
telemetry_derived__experiments_daily_active_clients__v1.set_upstream(
wait_for_copy_deduplicate_all
)
wait_for_telemetry_derived__clients_daily_joined__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_daily_joined__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_daily_joined__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_daily_joined__v1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -66,7 +68,7 @@ with DAG(
depends_on_past=False,
)
wait_for_bq_main_events = ExternalTaskCompletedSensor(
wait_for_bq_main_events = ExternalTaskSensor(
task_id="wait_for_bq_main_events",
external_dag_id="copy_deduplicate",
external_task_id="bq_main_events",
@ -77,7 +79,7 @@ with DAG(
)
telemetry_derived__feature_usage__v2.set_upstream(wait_for_bq_main_events)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
@ -88,7 +90,7 @@ with DAG(
)
telemetry_derived__feature_usage__v2.set_upstream(wait_for_copy_deduplicate_all)
wait_for_event_events = ExternalTaskCompletedSensor(
wait_for_event_events = ExternalTaskSensor(
task_id="wait_for_event_events",
external_dag_id="copy_deduplicate",
external_task_id="event_events",
@ -99,7 +101,7 @@ with DAG(
)
telemetry_derived__feature_usage__v2.set_upstream(wait_for_event_events)
wait_for_telemetry_derived__addons__v2 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__addons__v2 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__addons__v2",
external_dag_id="bqetl_addons",
external_task_id="telemetry_derived__addons__v2",
@ -112,7 +114,7 @@ with DAG(
telemetry_derived__feature_usage__v2.set_upstream(
wait_for_telemetry_derived__addons__v2
)
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_last_seen__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_last_seen__v1",
@ -125,7 +127,7 @@ with DAG(
telemetry_derived__feature_usage__v2.set_upstream(
wait_for_telemetry_derived__clients_last_seen__v1
)
wait_for_telemetry_derived__main_1pct__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__main_1pct__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__main_1pct__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__main_1pct__v1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -74,7 +76,7 @@ with DAG(
fenix_derived__event_types__v1.set_upstream(fenix_derived__event_types_history__v1)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -52,7 +54,7 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_main_ping = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -75,6 +77,43 @@ with DAG(
arguments=["--schema_update_option=ALLOW_FIELD_ADDITION"],
)
with TaskGroup(
"firefox_accounts_derived__fxa_auth_events__v1_external"
) as firefox_accounts_derived__fxa_auth_events__v1_external:
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_mozilla_vpn_derived__login_flows__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_mozilla_vpn_derived__login_flows__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_mozilla_vpn_derived__funnel_product_page_to_subscribed__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_mozilla_vpn_derived__funnel_product_page_to_subscribed__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_mozilla_vpn_derived__fxa_attribution__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_mozilla_vpn_derived__fxa_attribution__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_cjms_bigquery__flows__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_cjms_bigquery__flows__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_event_rollup__wait_for_funnel_events_source__v1",
external_dag_id="bqetl_event_rollup",
external_task_id="wait_for_funnel_events_source__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=81000)).isoformat() }}",
)
firefox_accounts_derived__fxa_auth_events__v1_external.set_upstream(
firefox_accounts_derived__fxa_auth_events__v1
)
firefox_accounts_derived__fxa_content_events__v1 = bigquery_etl_query(
task_id="firefox_accounts_derived__fxa_content_events__v1",
destination_table="fxa_content_events_v1",
@ -87,6 +126,43 @@ with DAG(
arguments=["--schema_update_option=ALLOW_FIELD_ADDITION"],
)
with TaskGroup(
"firefox_accounts_derived__fxa_content_events__v1_external"
) as firefox_accounts_derived__fxa_content_events__v1_external:
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_mozilla_vpn_derived__login_flows__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_mozilla_vpn_derived__login_flows__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_mozilla_vpn_derived__funnel_product_page_to_subscribed__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_mozilla_vpn_derived__funnel_product_page_to_subscribed__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_mozilla_vpn_derived__fxa_attribution__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_mozilla_vpn_derived__fxa_attribution__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_cjms_bigquery__flows__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_cjms_bigquery__flows__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_event_rollup__wait_for_funnel_events_source__v1",
external_dag_id="bqetl_event_rollup",
external_task_id="wait_for_funnel_events_source__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=81000)).isoformat() }}",
)
firefox_accounts_derived__fxa_content_events__v1_external.set_upstream(
firefox_accounts_derived__fxa_content_events__v1
)
firefox_accounts_derived__fxa_delete_events__v1 = bigquery_etl_query(
task_id="firefox_accounts_derived__fxa_delete_events__v1",
destination_table="fxa_delete_events_v1",
@ -143,6 +219,31 @@ with DAG(
arguments=["--schema_update_option=ALLOW_FIELD_ADDITION"],
)
with TaskGroup(
"firefox_accounts_derived__fxa_stdout_events__v1_external"
) as firefox_accounts_derived__fxa_stdout_events__v1_external:
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_mozilla_vpn_derived__funnel_product_page_to_subscribed__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_mozilla_vpn_derived__funnel_product_page_to_subscribed__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_mozilla_vpn_derived__fxa_attribution__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_mozilla_vpn_derived__fxa_attribution__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_subplat__wait_for_cjms_bigquery__flows__v1",
external_dag_id="bqetl_subplat",
external_task_id="wait_for_cjms_bigquery__flows__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=85500)).isoformat() }}",
)
firefox_accounts_derived__fxa_stdout_events__v1_external.set_upstream(
firefox_accounts_derived__fxa_stdout_events__v1
)
firefox_accounts_derived__fxa_users_daily__v1 = bigquery_etl_query(
task_id="firefox_accounts_derived__fxa_users_daily__v1",
destination_table="fxa_users_daily_v1",
@ -184,6 +285,19 @@ with DAG(
depends_on_past=True,
)
with TaskGroup(
"firefox_accounts_derived__fxa_users_last_seen__v1_external"
) as firefox_accounts_derived__fxa_users_last_seen__v1_external:
ExternalTaskMarker(
task_id="bqetl_gud__wait_for_telemetry_derived__smoot_usage_fxa__v2",
external_dag_id="bqetl_gud",
external_task_id="wait_for_telemetry_derived__smoot_usage_fxa__v2",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=81000)).isoformat() }}",
)
firefox_accounts_derived__fxa_users_last_seen__v1_external.set_upstream(
firefox_accounts_derived__fxa_users_last_seen__v1
)
firefox_accounts_derived__fxa_users_services_daily__v1 = bigquery_etl_query(
task_id="firefox_accounts_derived__fxa_users_services_daily__v1",
destination_table="fxa_users_services_daily_v1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -129,7 +131,7 @@ with DAG(
depends_on_past=False,
)
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_last_seen__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_last_seen__v1",
@ -147,16 +149,14 @@ with DAG(
telemetry_derived__smoot_usage_desktop__v2
)
wait_for_firefox_accounts_derived__fxa_users_last_seen__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_firefox_accounts_derived__fxa_users_last_seen__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_users_last_seen__v1",
execution_delta=datetime.timedelta(seconds=5400),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_firefox_accounts_derived__fxa_users_last_seen__v1 = ExternalTaskSensor(
task_id="wait_for_firefox_accounts_derived__fxa_users_last_seen__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_users_last_seen__v1",
execution_delta=datetime.timedelta(seconds=5400),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
telemetry_derived__smoot_usage_fxa__v2.set_upstream(
@ -183,7 +183,7 @@ with DAG(
telemetry_derived__smoot_usage_new_profiles__v2
)
wait_for_baseline_clients_last_seen = ExternalTaskCompletedSensor(
wait_for_baseline_clients_last_seen = ExternalTaskSensor(
task_id="wait_for_baseline_clients_last_seen",
external_dag_id="copy_deduplicate",
external_task_id="baseline_clients_last_seen",
@ -196,16 +196,14 @@ with DAG(
telemetry_derived__smoot_usage_nondesktop__v2.set_upstream(
wait_for_baseline_clients_last_seen
)
wait_for_telemetry_derived__core_clients_last_seen__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_telemetry_derived__core_clients_last_seen__v1",
external_dag_id="bqetl_core",
external_task_id="telemetry_derived__core_clients_last_seen__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_telemetry_derived__core_clients_last_seen__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__core_clients_last_seen__v1",
external_dag_id="bqetl_core",
external_task_id="telemetry_derived__core_clients_last_seen__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
telemetry_derived__smoot_usage_nondesktop__v2.set_upstream(

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -54,7 +56,7 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -54,7 +56,7 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
@ -65,7 +67,7 @@ with DAG(
)
internet_outages__global_outages__v1.set_upstream(wait_for_copy_deduplicate_all)
wait_for_copy_deduplicate_main_ping = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",
@ -78,7 +80,7 @@ with DAG(
internet_outages__global_outages__v1.set_upstream(
wait_for_copy_deduplicate_main_ping
)
wait_for_telemetry_derived__clients_daily__v6 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_daily__v6 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_daily__v6",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_daily__v6",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -124,6 +126,37 @@ with DAG(
depends_on_past=False,
)
with TaskGroup(
"telemetry_derived__clients_daily__v6_external"
) as telemetry_derived__clients_daily__v6_external:
ExternalTaskMarker(
task_id="bqetl_internet_outages__wait_for_internet_outages__global_outages__v1",
external_dag_id="bqetl_internet_outages",
external_task_id="wait_for_internet_outages__global_outages__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=68400)).isoformat() }}",
)
ExternalTaskMarker(
task_id="jetstream__wait_for_wait_for_clients_daily",
external_dag_id="jetstream",
external_task_id="wait_for_wait_for_clients_daily",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="operational_monitoring__wait_for_wait_for_clients_daily",
external_dag_id="operational_monitoring",
external_task_id="wait_for_wait_for_clients_daily",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="parquet_export__wait_for_wait_for_clients_daily",
external_dag_id="parquet_export",
external_task_id="wait_for_wait_for_clients_daily",
execution_date="{{ (execution_date + macros.timedelta(seconds=3600)).isoformat() }}",
)
telemetry_derived__clients_daily__v6_external.set_upstream(
telemetry_derived__clients_daily__v6
)
telemetry_derived__clients_daily_event__v1 = bigquery_etl_query(
task_id="telemetry_derived__clients_daily_event__v1",
destination_table="clients_daily_event_v1",
@ -158,6 +191,37 @@ with DAG(
priority_weight=85,
)
with TaskGroup(
"telemetry_derived__clients_daily_joined__v1_external"
) as telemetry_derived__clients_daily_joined__v1_external:
ExternalTaskMarker(
task_id="bqetl_search__wait_for_search_derived__search_clients_daily__v8",
external_dag_id="bqetl_search",
external_task_id="wait_for_search_derived__search_clients_daily__v8",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_devtools__wait_for_telemetry_derived__devtools_panel_usage__v1",
external_dag_id="bqetl_devtools",
external_task_id="wait_for_telemetry_derived__devtools_panel_usage__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_experiments_daily__wait_for_telemetry_derived__experiments_daily_active_clients__v1",
external_dag_id="bqetl_experiments_daily",
external_task_id="wait_for_telemetry_derived__experiments_daily_active_clients__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_urlbar__wait_for_telemetry_derived__urlbar_clients_daily__v1",
external_dag_id="bqetl_urlbar",
external_task_id="wait_for_telemetry_derived__urlbar_clients_daily__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
telemetry_derived__clients_daily_joined__v1_external.set_upstream(
telemetry_derived__clients_daily_joined__v1
)
telemetry_derived__clients_first_seen__v1 = bigquery_etl_query(
task_id="telemetry_derived__clients_first_seen__v1",
destination_table="clients_first_seen_v1",
@ -189,6 +253,55 @@ with DAG(
priority_weight=85,
)
with TaskGroup(
"telemetry_derived__clients_last_seen__v1_external"
) as telemetry_derived__clients_last_seen__v1_external:
ExternalTaskMarker(
task_id="bqetl_gud__wait_for_telemetry_derived__smoot_usage_desktop__v2",
external_dag_id="bqetl_gud",
external_task_id="wait_for_telemetry_derived__smoot_usage_desktop__v2",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_addons__wait_for_telemetry_derived__addons_daily__v1",
external_dag_id="bqetl_addons",
external_task_id="wait_for_telemetry_derived__addons_daily__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=79200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_search_dashboard__wait_for_search_derived__desktop_search_aggregates_by_userstate__v1",
external_dag_id="bqetl_search_dashboard",
external_task_id="wait_for_search_derived__desktop_search_aggregates_by_userstate__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=79200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_desktop_funnel__wait_for_telemetry_derived__desktop_funnel_activation_day_6__v1",
external_dag_id="bqetl_desktop_funnel",
external_task_id="wait_for_telemetry_derived__desktop_funnel_activation_day_6__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=79200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_feature_usage__wait_for_telemetry_derived__feature_usage__v2",
external_dag_id="bqetl_feature_usage",
external_task_id="wait_for_telemetry_derived__feature_usage__v2",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=75600)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_unified__wait_for_telemetry_derived__unified_metrics__v1",
external_dag_id="bqetl_unified",
external_task_id="wait_for_telemetry_derived__unified_metrics__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
ExternalTaskMarker(
task_id="taar_daily__wait_for_wait_for_clients_last_seen",
external_dag_id="taar_daily",
external_task_id="wait_for_wait_for_clients_last_seen",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
telemetry_derived__clients_last_seen__v1_external.set_upstream(
telemetry_derived__clients_last_seen__v1
)
telemetry_derived__clients_last_seen_event__v1 = bigquery_etl_query(
task_id="telemetry_derived__clients_last_seen_event__v1",
destination_table="clients_last_seen_event_v1",
@ -272,6 +385,19 @@ with DAG(
arguments=["--schema_update_option=ALLOW_FIELD_ADDITION"],
)
with TaskGroup(
"telemetry_derived__main_1pct__v1_external"
) as telemetry_derived__main_1pct__v1_external:
ExternalTaskMarker(
task_id="bqetl_feature_usage__wait_for_telemetry_derived__feature_usage__v2",
external_dag_id="bqetl_feature_usage",
external_task_id="wait_for_telemetry_derived__feature_usage__v2",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=75600)).isoformat() }}",
)
telemetry_derived__main_1pct__v1_external.set_upstream(
telemetry_derived__main_1pct__v1
)
telemetry_derived__main_nightly__v1 = bigquery_etl_query(
task_id="telemetry_derived__main_nightly__v1",
destination_table="main_nightly_v1",
@ -308,6 +434,31 @@ with DAG(
priority_weight=90,
)
with TaskGroup(
"telemetry_derived__main_summary__v4_external"
) as telemetry_derived__main_summary__v4_external:
ExternalTaskMarker(
task_id="jetstream__wait_for_wait_for_main_summary",
external_dag_id="jetstream",
external_task_id="wait_for_wait_for_main_summary",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="operational_monitoring__wait_for_wait_for_main_summary",
external_dag_id="operational_monitoring",
external_task_id="wait_for_wait_for_main_summary",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="parquet_export__wait_for_wait_for_main_summary",
external_dag_id="parquet_export",
external_task_id="wait_for_wait_for_main_summary",
execution_date="{{ (execution_date + macros.timedelta(seconds=3600)).isoformat() }}",
)
telemetry_derived__main_summary__v4_external.set_upstream(
telemetry_derived__main_summary__v4
)
firefox_desktop_exact_mau28_by_client_count_dimensions.set_upstream(
telemetry_derived__clients_last_seen__v1
)
@ -320,7 +471,7 @@ with DAG(
telemetry_derived__clients_last_seen__v1
)
wait_for_copy_deduplicate_main_ping = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",
@ -334,7 +485,7 @@ with DAG(
wait_for_copy_deduplicate_main_ping
)
wait_for_bq_main_events = ExternalTaskCompletedSensor(
wait_for_bq_main_events = ExternalTaskSensor(
task_id="wait_for_bq_main_events",
external_dag_id="copy_deduplicate",
external_task_id="bq_main_events",
@ -345,7 +496,7 @@ with DAG(
)
telemetry_derived__clients_daily_event__v1.set_upstream(wait_for_bq_main_events)
wait_for_event_events = ExternalTaskCompletedSensor(
wait_for_event_events = ExternalTaskSensor(
task_id="wait_for_event_events",
external_dag_id="copy_deduplicate",
external_task_id="event_events",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -146,7 +148,7 @@ with DAG(
messaging_system_derived__cfr_users_last_seen__v1
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -72,6 +74,25 @@ with DAG(
depends_on_past=False,
)
with TaskGroup(
"search_derived__mobile_search_clients_daily__v1_external"
) as search_derived__mobile_search_clients_daily__v1_external:
ExternalTaskMarker(
task_id="bqetl_search_dashboard__wait_for_search_derived__mobile_search_aggregates_for_searchreport__v1",
external_dag_id="bqetl_search_dashboard",
external_task_id="wait_for_search_derived__mobile_search_aggregates_for_searchreport__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=79200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_unified__wait_for_telemetry_derived__unified_metrics__v1",
external_dag_id="bqetl_unified",
external_task_id="wait_for_telemetry_derived__unified_metrics__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
search_derived__mobile_search_clients_daily__v1_external.set_upstream(
search_derived__mobile_search_clients_daily__v1
)
search_derived__mobile_search_clients_last_seen__v1 = bigquery_etl_query(
task_id="search_derived__mobile_search_clients_last_seen__v1",
destination_table="mobile_search_clients_last_seen_v1",
@ -91,7 +112,7 @@ with DAG(
search_derived__mobile_search_clients_daily__v1
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -173,7 +175,7 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
@ -191,7 +193,7 @@ with DAG(
monitoring_derived__stable_table_sizes__v1
)
wait_for_copy_deduplicate_main_ping = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -85,7 +87,7 @@ with DAG(
depends_on_past=False,
)
wait_for_mozilla_vpn_derived__all_subscriptions__v1 = ExternalTaskCompletedSensor(
wait_for_mozilla_vpn_derived__all_subscriptions__v1 = ExternalTaskSensor(
task_id="wait_for_mozilla_vpn_derived__all_subscriptions__v1",
external_dag_id="bqetl_subplat",
external_task_id="mozilla_vpn_derived__all_subscriptions__v1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -82,23 +84,34 @@ with DAG(
depends_on_past=False,
)
wait_for_telemetry_derived__core_clients_last_seen__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_telemetry_derived__core_clients_last_seen__v1",
external_dag_id="bqetl_core",
external_task_id="telemetry_derived__core_clients_last_seen__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
with TaskGroup(
"telemetry_derived__mobile_usage__v1_external"
) as telemetry_derived__mobile_usage__v1_external:
ExternalTaskMarker(
task_id="kpi_forecasting__wait_for_wait_for_mobile_usage",
external_dag_id="kpi_forecasting",
external_task_id="wait_for_wait_for_mobile_usage",
execution_date="{{ (execution_date + macros.timedelta(seconds=3600)).isoformat() }}",
)
telemetry_derived__mobile_usage__v1_external.set_upstream(
telemetry_derived__mobile_usage__v1
)
wait_for_telemetry_derived__core_clients_last_seen__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__core_clients_last_seen__v1",
external_dag_id="bqetl_core",
external_task_id="telemetry_derived__core_clients_last_seen__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
firefox_nondesktop_exact_mau28_by_client_count_dimensions.set_upstream(
wait_for_telemetry_derived__core_clients_last_seen__v1
)
wait_for_baseline_clients_last_seen = ExternalTaskCompletedSensor(
wait_for_baseline_clients_last_seen = ExternalTaskSensor(
task_id="wait_for_baseline_clients_last_seen",
external_dag_id="copy_deduplicate",
external_task_id="baseline_clients_last_seen",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -49,7 +51,7 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -4,7 +4,7 @@ from airflow import DAG
from airflow.utils.state import State
import datetime
from operators.gcp_container_operator import GKEPodOperator
from operators.task_sensor import ExternalTaskCompletedSensor
from operators.task_sensor import ExternalTaskSensor
from utils.gcp import gke_command
docs = """
@ -89,26 +89,10 @@ with DAG(
image=docker_image,
)
wait_for_client_probe_processes__v1 = ExternalTaskCompletedSensor(
task_id="wait_for_client_probe_processes__v1",
external_dag_id="bqetl_main_summary",
external_task_id="client_probe_processes__v1",
execution_delta=datetime.timedelta(seconds=10800),
check_existence=True,
mode="reschedule",
failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED],
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
export_public_data_json_client_probe_processes__v1.set_upstream(
wait_for_client_probe_processes__v1
)
wait_for_mozregression_aggregates__v1 = ExternalTaskCompletedSensor(
task_id="wait_for_mozregression_aggregates__v1",
external_dag_id="bqetl_internal_tooling",
external_task_id="mozregression_aggregates__v1",
execution_delta=datetime.timedelta(seconds=3600),
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
check_existence=True,
mode="reschedule",
failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED],
@ -116,14 +100,13 @@ with DAG(
)
export_public_data_json_mozregression_aggregates__v1.set_upstream(
wait_for_mozregression_aggregates__v1
wait_for_copy_deduplicate_all
)
wait_for_telemetry_derived__ssl_ratios__v1 = ExternalTaskCompletedSensor(
task_id="wait_for_telemetry_derived__ssl_ratios__v1",
external_dag_id="bqetl_ssl_ratios",
external_task_id="telemetry_derived__ssl_ratios__v1",
execution_delta=datetime.timedelta(seconds=10800),
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",
check_existence=True,
mode="reschedule",
failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED],
@ -131,7 +114,7 @@ with DAG(
)
export_public_data_json_telemetry_derived__ssl_ratios__v1.set_upstream(
wait_for_telemetry_derived__ssl_ratios__v1
wait_for_copy_deduplicate_main_ping
)
public_data_gcs_metadata = gke_command(

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -53,7 +55,7 @@ with DAG(
parameters=["submission_date:DATE:{{ds}}"],
)
wait_for_copy_deduplicate_all = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_all = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_all",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -57,6 +59,19 @@ with DAG(
depends_on_past=False,
)
with TaskGroup(
"search_derived__search_aggregates__v8_external"
) as search_derived__search_aggregates__v8_external:
ExternalTaskMarker(
task_id="bqetl_search_dashboard__wait_for_search_derived__desktop_search_aggregates_for_searchreport__v1",
external_dag_id="bqetl_search_dashboard",
external_task_id="wait_for_search_derived__desktop_search_aggregates_for_searchreport__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
search_derived__search_aggregates__v8_external.set_upstream(
search_derived__search_aggregates__v8
)
search_derived__search_clients_daily__v8 = bigquery_etl_query(
task_id="search_derived__search_clients_daily__v8",
destination_table="search_clients_daily_v8",
@ -72,6 +87,31 @@ with DAG(
depends_on_past=False,
)
with TaskGroup(
"search_derived__search_clients_daily__v8_external"
) as search_derived__search_clients_daily__v8_external:
ExternalTaskMarker(
task_id="bqetl_addons__wait_for_telemetry_derived__addons_daily__v1",
external_dag_id="bqetl_addons",
external_task_id="wait_for_telemetry_derived__addons_daily__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=82800)).isoformat() }}",
)
ExternalTaskMarker(
task_id="jetstream__wait_for_wait_for_search_clients_daily",
external_dag_id="jetstream",
external_task_id="wait_for_wait_for_search_clients_daily",
execution_date="{{ (execution_date + macros.timedelta(seconds=3600)).isoformat() }}",
)
ExternalTaskMarker(
task_id="operational_monitoring__wait_for_wait_for_search_clients_daily",
external_dag_id="operational_monitoring",
external_task_id="wait_for_wait_for_search_clients_daily",
execution_date="{{ (execution_date + macros.timedelta(seconds=3600)).isoformat() }}",
)
search_derived__search_clients_daily__v8_external.set_upstream(
search_derived__search_clients_daily__v8
)
search_derived__search_clients_last_seen__v1 = bigquery_etl_query(
task_id="search_derived__search_clients_last_seen__v1",
destination_table="search_clients_last_seen_v1",
@ -87,6 +127,19 @@ with DAG(
depends_on_past=True,
)
with TaskGroup(
"search_derived__search_clients_last_seen__v1_external"
) as search_derived__search_clients_last_seen__v1_external:
ExternalTaskMarker(
task_id="ltv_daily__wait_for_wait_for_search_clients_last_seen",
external_dag_id="ltv_daily",
external_task_id="wait_for_wait_for_search_clients_last_seen",
execution_date="{{ (execution_date + macros.timedelta(seconds=3600)).isoformat() }}",
)
search_derived__search_clients_last_seen__v1_external.set_upstream(
search_derived__search_clients_last_seen__v1
)
search_derived__search_metric_contribution__v1 = bigquery_etl_query(
task_id="search_derived__search_metric_contribution__v1",
destination_table="search_metric_contribution_v1",
@ -107,7 +160,7 @@ with DAG(
search_derived__search_clients_daily__v8
)
wait_for_telemetry_derived__clients_daily_joined__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_daily_joined__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_daily_joined__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_daily_joined__v1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -84,7 +86,7 @@ with DAG(
depends_on_past=False,
)
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_last_seen__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_last_seen__v1",
@ -98,7 +100,7 @@ with DAG(
wait_for_telemetry_derived__clients_last_seen__v1
)
wait_for_search_derived__search_aggregates__v8 = ExternalTaskCompletedSensor(
wait_for_search_derived__search_aggregates__v8 = ExternalTaskSensor(
task_id="wait_for_search_derived__search_aggregates__v8",
external_dag_id="bqetl_search",
external_task_id="search_derived__search_aggregates__v8",
@ -112,16 +114,14 @@ with DAG(
wait_for_search_derived__search_aggregates__v8
)
wait_for_search_derived__mobile_search_clients_daily__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_search_derived__mobile_search_clients_daily__v1",
external_dag_id="bqetl_mobile_search",
external_task_id="search_derived__mobile_search_clients_daily__v1",
execution_delta=datetime.timedelta(seconds=7200),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_search_derived__mobile_search_clients_daily__v1 = ExternalTaskSensor(
task_id="wait_for_search_derived__mobile_search_clients_daily__v1",
external_dag_id="bqetl_mobile_search",
external_task_id="search_derived__mobile_search_clients_daily__v1",
execution_delta=datetime.timedelta(seconds=7200),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
search_derived__mobile_search_aggregates_for_searchreport__v1.set_upstream(

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -85,6 +87,19 @@ with DAG(
arguments=["--schema_update_option=ALLOW_FIELD_ADDITION"],
)
with TaskGroup(
"search_terms_derived__adm_weekly_aggregates__v1_external"
) as search_terms_derived__adm_weekly_aggregates__v1_external:
ExternalTaskMarker(
task_id="adm_export__wait_for_wait_for_adm_weekly_aggregates",
external_dag_id="adm_export",
external_task_id="wait_for_wait_for_adm_weekly_aggregates",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
search_terms_derived__adm_weekly_aggregates__v1_external.set_upstream(
search_terms_derived__adm_weekly_aggregates__v1
)
search_terms_derived__aggregated_search_terms_daily__v1 = bigquery_etl_query(
task_id="search_terms_derived__aggregated_search_terms_daily__v1",
destination_table="aggregated_search_terms_daily_v1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -52,7 +54,7 @@ with DAG(
depends_on_past=False,
)
wait_for_copy_deduplicate_main_ping = ExternalTaskCompletedSensor(
wait_for_copy_deduplicate_main_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate_main_ping",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_main_ping",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -143,6 +145,19 @@ with DAG(
depends_on_past=False,
)
with TaskGroup(
"mozilla_vpn_derived__all_subscriptions__v1_external"
) as mozilla_vpn_derived__all_subscriptions__v1_external:
ExternalTaskMarker(
task_id="bqetl_mozilla_vpn_site_metrics__wait_for_mozilla_vpn_derived__funnel_ga_to_subscriptions__v1",
external_dag_id="bqetl_mozilla_vpn_site_metrics",
external_task_id="wait_for_mozilla_vpn_derived__funnel_ga_to_subscriptions__v1",
execution_date="{{ (execution_date + macros.timedelta(days=-1, seconds=38700)).isoformat() }}",
)
mozilla_vpn_derived__all_subscriptions__v1_external.set_upstream(
mozilla_vpn_derived__all_subscriptions__v1
)
mozilla_vpn_derived__channel_group_proportions__v1 = bigquery_etl_query(
task_id="mozilla_vpn_derived__channel_group_proportions__v1",
destination_table='channel_group_proportions_v1${{ macros.ds_format(macros.ds_add(ds, -7), "%Y-%m-%d", "%Y%m%d") }}',
@ -507,46 +522,40 @@ with DAG(
email_on_retry=False,
)
wait_for_firefox_accounts_derived__fxa_auth_events__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_firefox_accounts_derived__fxa_auth_events__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_auth_events__v1",
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_firefox_accounts_derived__fxa_auth_events__v1 = ExternalTaskSensor(
task_id="wait_for_firefox_accounts_derived__fxa_auth_events__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_auth_events__v1",
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
cjms_bigquery__flows__v1.set_upstream(
wait_for_firefox_accounts_derived__fxa_auth_events__v1
)
wait_for_firefox_accounts_derived__fxa_content_events__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_firefox_accounts_derived__fxa_content_events__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_content_events__v1",
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_firefox_accounts_derived__fxa_content_events__v1 = ExternalTaskSensor(
task_id="wait_for_firefox_accounts_derived__fxa_content_events__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_content_events__v1",
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
cjms_bigquery__flows__v1.set_upstream(
wait_for_firefox_accounts_derived__fxa_content_events__v1
)
wait_for_firefox_accounts_derived__fxa_stdout_events__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_firefox_accounts_derived__fxa_stdout_events__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_stdout_events__v1",
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_firefox_accounts_derived__fxa_stdout_events__v1 = ExternalTaskSensor(
task_id="wait_for_firefox_accounts_derived__fxa_stdout_events__v1",
external_dag_id="bqetl_fxa_events",
external_task_id="firefox_accounts_derived__fxa_stdout_events__v1",
execution_delta=datetime.timedelta(seconds=900),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
cjms_bigquery__flows__v1.set_upstream(

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -64,6 +66,19 @@ with DAG(
depends_on_past=False,
)
with TaskGroup(
"telemetry_derived__rolling_cohorts__v1_external"
) as telemetry_derived__rolling_cohorts__v1_external:
ExternalTaskMarker(
task_id="bqetl_analytics_aggregations__wait_for_telemetry_derived__cohort_daily_statistics__v1",
external_dag_id="bqetl_analytics_aggregations",
external_task_id="wait_for_telemetry_derived__cohort_daily_statistics__v1",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
telemetry_derived__rolling_cohorts__v1_external.set_upstream(
telemetry_derived__rolling_cohorts__v1
)
telemetry_derived__unified_metrics__v1 = bigquery_etl_query(
task_id="telemetry_derived__unified_metrics__v1",
destination_table="unified_metrics_v1",
@ -80,26 +95,61 @@ with DAG(
depends_on_past=False,
)
with TaskGroup(
"telemetry_derived__unified_metrics__v1_external"
) as telemetry_derived__unified_metrics__v1_external:
ExternalTaskMarker(
task_id="bqetl_analytics_aggregations__wait_for_active_users_aggregates_device_v1",
external_dag_id="bqetl_analytics_aggregations",
external_task_id="wait_for_active_users_aggregates_device_v1",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_analytics_aggregations__wait_for_telemetry_derived__cohort_daily_statistics__v1",
external_dag_id="bqetl_analytics_aggregations",
external_task_id="wait_for_telemetry_derived__cohort_daily_statistics__v1",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_analytics_aggregations__wait_for_active_users_aggregates_attribution_v1",
external_dag_id="bqetl_analytics_aggregations",
external_task_id="wait_for_active_users_aggregates_attribution_v1",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="bqetl_analytics_aggregations__wait_for_active_users_aggregates_v1",
external_dag_id="bqetl_analytics_aggregations",
external_task_id="wait_for_active_users_aggregates_v1",
execution_date="{{ (execution_date + macros.timedelta(seconds=7200)).isoformat() }}",
)
ExternalTaskMarker(
task_id="kpi_forecasting__wait_for_wait_for_unified_metrics",
external_dag_id="kpi_forecasting",
external_task_id="wait_for_wait_for_unified_metrics",
execution_date="{{ (execution_date + macros.timedelta(seconds=3600)).isoformat() }}",
)
telemetry_derived__unified_metrics__v1_external.set_upstream(
telemetry_derived__unified_metrics__v1
)
telemetry_derived__rolling_cohorts__v1.set_upstream(
telemetry_derived__unified_metrics__v1
)
wait_for_search_derived__mobile_search_clients_daily__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_search_derived__mobile_search_clients_daily__v1",
external_dag_id="bqetl_mobile_search",
external_task_id="search_derived__mobile_search_clients_daily__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_search_derived__mobile_search_clients_daily__v1 = ExternalTaskSensor(
task_id="wait_for_search_derived__mobile_search_clients_daily__v1",
external_dag_id="bqetl_mobile_search",
external_task_id="search_derived__mobile_search_clients_daily__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
telemetry_derived__unified_metrics__v1.set_upstream(
wait_for_search_derived__mobile_search_clients_daily__v1
)
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_last_seen__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_last_seen__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_last_seen__v1",
@ -112,16 +162,14 @@ with DAG(
telemetry_derived__unified_metrics__v1.set_upstream(
wait_for_telemetry_derived__clients_last_seen__v1
)
wait_for_telemetry_derived__core_clients_last_seen__v1 = (
ExternalTaskCompletedSensor(
task_id="wait_for_telemetry_derived__core_clients_last_seen__v1",
external_dag_id="bqetl_core",
external_task_id="telemetry_derived__core_clients_last_seen__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
wait_for_telemetry_derived__core_clients_last_seen__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__core_clients_last_seen__v1",
external_dag_id="bqetl_core",
external_task_id="telemetry_derived__core_clients_last_seen__v1",
execution_delta=datetime.timedelta(seconds=3600),
check_existence=True,
mode="reschedule",
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
telemetry_derived__unified_metrics__v1.set_upstream(

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -63,7 +65,7 @@ with DAG(
depends_on_past=False,
)
wait_for_telemetry_derived__clients_daily_joined__v1 = ExternalTaskCompletedSensor(
wait_for_telemetry_derived__clients_daily_joined__v1 = ExternalTaskSensor(
task_id="wait_for_telemetry_derived__clients_daily_joined__v1",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_daily_joined__v1",

Просмотреть файл

@ -9,6 +9,13 @@ labels:
schedule: daily
scheduling:
dag_name: bqetl_search
external_downstream_tasks:
- task_id: wait_for_search_clients_daily
dag_name: jetstream
execution_delta: 1h
- task_id: wait_for_search_clients_daily
dag_name: operational_monitoring
execution_delta: 1h
bigquery:
time_partitioning:
field: submission_date

Просмотреть файл

@ -13,6 +13,10 @@ labels:
scheduling:
dag_name: bqetl_search
depends_on_past: true
external_downstream_tasks:
- task_id: wait_for_search_clients_last_seen
dag_name: ltv_daily
execution_delta: 1h
bigquery:
time_partitioning:
field: submission_date

Просмотреть файл

@ -20,3 +20,7 @@ bigquery:
scheduling:
dag_name: bqetl_search_terms_daily
arguments: ['--schema_update_option=ALLOW_FIELD_ADDITION']
external_downstream_tasks:
- task_id: wait_for_adm_weekly_aggregates
dag_name: adm_export
execution_delta: 2h

Просмотреть файл

@ -17,6 +17,16 @@ labels:
scheduling:
dag_name: bqetl_main_summary
start_date: '2019-11-05'
external_downstream_tasks:
- task_id: wait_for_clients_daily
dag_name: jetstream
execution_delta: 2h
- task_id: wait_for_clients_daily
dag_name: operational_monitoring
execution_delta: 2h
- task_id: wait_for_clients_daily
dag_name: parquet_export
execution_delta: 1h
bigquery:
time_partitioning:
field: submission_date

Просмотреть файл

@ -23,6 +23,10 @@ scheduling:
- dthorn@mozilla.com
- jklukas@mozilla.com
depends_on_past: true
external_downstream_tasks:
- task_id: wait_for_clients_last_seen
dag_name: taar_daily
execution_delta: 2h
bigquery:
time_partitioning:
field: submission_date

Просмотреть файл

@ -15,3 +15,13 @@ scheduling:
# query to get it, and that would be slow because main_v4 is referenced
referenced_tables: [['moz-fx-data-shared-prod', 'telemetry_stable',
'main_v4']]
external_downstream_tasks:
- task_id: wait_for_main_summary
dag_name: jetstream
execution_delta: 2h
- task_id: wait_for_main_summary
dag_name: operational_monitoring
execution_delta: 2h
- task_id: wait_for_main_summary
dag_name: parquet_export
execution_delta: 1h

Просмотреть файл

@ -25,3 +25,7 @@ scheduling:
# previous day and adding dau from the new day, but this would make each
# query dependent on history. Recreating the table is simpler and more robust.
date_partition_parameter: null
external_downstream_tasks:
- task_id: wait_for_mobile_usage
dag_name: kpi_forecasting
execution_delta: 1h

Просмотреть файл

@ -10,6 +10,10 @@ labels:
incremental: true
scheduling:
dag_name: bqetl_unified
external_downstream_tasks:
- task_id: wait_for_unified_metrics
dag_name: kpi_forecasting
execution_delta: 1h
bigquery:
time_partitioning:
field: submission_date

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -60,7 +62,7 @@ with DAG(
depends_on_past=True,
)
wait_for_task1 = ExternalTaskCompletedSensor(
wait_for_task1 = ExternalTaskSensor(
task_id="wait_for_task1",
external_dag_id="external",
external_task_id="task1",

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -48,3 +50,13 @@ with DAG(
date_partition_parameter="submission_date",
depends_on_past=False,
)
with TaskGroup(
"test__external_table__v1_external"
) as test__external_table__v1_external:
ExternalTaskMarker(
task_id="bqetl_test_dag__wait_for_test__query__v1",
external_dag_id="bqetl_test_dag",
external_task_id="wait_for_test__query__v1",
)
test__external_table__v1_external.set_upstream(test__external_table__v1)

Просмотреть файл

@ -1,7 +1,9 @@
# Generated via https://github.com/mozilla/bigquery-etl/blob/main/bigquery_etl/query_scheduling/generate_airflow_dags.py
from airflow import DAG
from operators.task_sensor import ExternalTaskCompletedSensor
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
import datetime
from utils.gcp import bigquery_etl_query, gke_command
@ -71,7 +73,7 @@ with DAG(
depends_on_past=False,
)
wait_for_test__external_table__v1 = ExternalTaskCompletedSensor(
wait_for_test__external_table__v1 = ExternalTaskSensor(
task_id="wait_for_test__external_table__v1",
external_dag_id="bqetl_external_test_dag",
external_task_id="test__external_table__v1",

Просмотреть файл

@ -4,7 +4,7 @@ from airflow import DAG
from airflow.utils.state import State
import datetime
from operators.gcp_container_operator import GKEPodOperator
from operators.task_sensor import ExternalTaskCompletedSensor
from operators.task_sensor import ExternalTaskSensor
from utils.gcp import gke_command
docs = """
@ -55,20 +55,6 @@ with DAG(
image=docker_image,
)
wait_for_test__non_incremental_query__v1 = ExternalTaskCompletedSensor(
task_id="wait_for_test__non_incremental_query__v1",
external_dag_id="bqetl_core",
external_task_id="test__non_incremental_query__v1",
check_existence=True,
mode="reschedule",
failed_states=[State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED],
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
export_public_data_json_test__non_incremental_query__v1.set_upstream(
wait_for_test__non_incremental_query__v1
)
public_data_gcs_metadata = gke_command(
task_id="public_data_gcs_metadata",
command=["script/publish_public_data_gcs_metadata"],

Просмотреть файл

@ -317,7 +317,7 @@ class TestDagCollection:
assert result == expected
@pytest.mark.java
def test_to_airflow_with_dependencies(self, tmp_path):
def test_to_airflow_with_upstream_dependencies(self, tmp_path):
query_file_path = tmp_path / "test-project" / "test" / "query_v1"
os.makedirs(query_file_path)
@ -402,7 +402,7 @@ class TestDagCollection:
dags.to_airflow_dags(tmp_path)
expected_dag_with_dependencies = (
expected_dag_with_upstream_dependencies = (
(TEST_DIR / "data" / "dags" / "test_dag_with_dependencies")
.read_text()
.strip()
@ -413,12 +413,14 @@ class TestDagCollection:
.strip()
)
dag_with_dependencies = (tmp_path / "bqetl_test_dag.py").read_text().strip()
dag_with_upstream_dependencies = (
(tmp_path / "bqetl_test_dag.py").read_text().strip()
)
dag_external_dependency = (
(tmp_path / "bqetl_external_test_dag.py").read_text().strip()
)
assert dag_with_dependencies == expected_dag_with_dependencies
assert dag_with_upstream_dependencies == expected_dag_with_upstream_dependencies
assert dag_external_dependency == expected_dag_external_dependency
@pytest.mark.java

Просмотреть файл

@ -403,7 +403,7 @@ class TestTask:
task = Task.of_query(query_file, metadata)
dags = DagCollection.from_dict({})
task.with_dependencies(dags)
task.with_upstream_dependencies(dags)
assert task.dependencies == []
@pytest.mark.java
@ -444,7 +444,7 @@ class TestTask:
}
).with_tasks([task, table_task1, table_task2])
task.with_dependencies(dags)
task.with_upstream_dependencies(dags)
result = task.dependencies
tables = [t.task_id for t in result]
@ -490,7 +490,7 @@ class TestTask:
}
).with_tasks([task, table_task1, table_task2])
task.with_dependencies(dags)
task.with_upstream_dependencies(dags)
result = task.dependencies
tables = [t.task_id for t in result]
@ -545,7 +545,7 @@ class TestTask:
}
).with_tasks([task, table_task1, table_task2])
task.with_dependencies(dags)
task.with_upstream_dependencies(dags)
result = task.dependencies
tables = [t.task_id for t in result]
@ -609,7 +609,7 @@ class TestTask:
}
).with_tasks([task, table_task1, table_task2])
task.with_dependencies(dags)
task.with_upstream_dependencies(dags)
result = task.dependencies
tables = [t.task_id for t in result]

Просмотреть файл

@ -80,4 +80,4 @@ tables = [
]
for name, data in tables:
with (ROOT / name).open("w") as fp:
yaml.dump(data, fp)
yaml.dump(data, fp)