fix deprecation warnings, clean up and update for 2.3.3
This commit is contained in:
Родитель
d2505e9ef3
Коммит
3cc49d4090
53
airflow.cfg
53
airflow.cfg
|
@ -15,20 +15,6 @@ dags_folder = $AIRFLOW_HOME/dags
|
|||
# SequentialExecutor, LocalExecutor, CeleryExecutor
|
||||
executor = CeleryExecutor
|
||||
|
||||
# The SqlAlchemy connection string to the metadata database.
|
||||
# SqlAlchemy supports many different database engine, more information
|
||||
# their website
|
||||
sql_alchemy_conn = $AIRFLOW_DATABASE_URL
|
||||
|
||||
# The SqlAlchemy pool size is the maximum number of database connections
|
||||
# in the pool.
|
||||
sql_alchemy_pool_size = 5
|
||||
|
||||
# The SqlAlchemy pool recycle is the number of seconds a connection
|
||||
# can be idle in the pool before it is invalidated. This config does
|
||||
# not apply to sqlite.
|
||||
sql_alchemy_pool_recycle = 3600
|
||||
|
||||
# The amount of parallelism as a setting to the executor. This defines
|
||||
# the max number of task instances that should run simultaneously
|
||||
# on this airflow installation
|
||||
|
@ -153,6 +139,22 @@ lazy_load_plugins = True
|
|||
# loaded from module.
|
||||
lazy_discover_providers = True
|
||||
|
||||
|
||||
[database]
|
||||
# The SqlAlchemy connection string to the metadata database.
|
||||
# SqlAlchemy supports many different database engine, more information
|
||||
# their website
|
||||
sql_alchemy_conn = $AIRFLOW_DATABASE_URL
|
||||
|
||||
# The SqlAlchemy pool size is the maximum number of database connections
|
||||
# in the pool.
|
||||
sql_alchemy_pool_size = 5
|
||||
|
||||
# The SqlAlchemy pool recycle is the number of seconds a connection
|
||||
# can be idle in the pool before it is invalidated. This config does
|
||||
# not apply to sqlite.
|
||||
sql_alchemy_pool_recycle = 3600
|
||||
|
||||
# Number of times the code should be retried in case of DB Operational Errors.
|
||||
# Not all transactions will be retried as it can cause undesired state.
|
||||
# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``.
|
||||
|
@ -279,7 +281,7 @@ expose_hostname = True
|
|||
expose_stacktrace = True
|
||||
|
||||
# Default DAG view. Valid values are: ``tree``, ``graph``, ``duration``, ``gantt``, ``landing_times``
|
||||
dag_default_view = tree
|
||||
dag_default_view = grid
|
||||
|
||||
# Default DAG orientation. Valid values are:
|
||||
# ``LR`` (Left->Right), ``TB`` (Top->Bottom), ``RL`` (Right->Left), ``BT`` (Bottom->Top)
|
||||
|
@ -596,7 +598,7 @@ num_runs = -1
|
|||
|
||||
# The number of seconds to wait between consecutive DAG file processing
|
||||
# Deprecated since version 2.2.0: The option has been moved to scheduler.scheduler_idle_sleep_time
|
||||
processor_poll_interval = 1
|
||||
scheduler_idle_sleep_time = 1
|
||||
|
||||
# Number of seconds after which a DAG file is parsed. The DAG file is parsed every
|
||||
# ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after
|
||||
|
@ -754,7 +756,7 @@ enable_experimental_api = False
|
|||
# How to authenticate users of the API. See
|
||||
# https://airflow.apache.org/docs/apache-airflow/stable/security.html for possible values.
|
||||
# ("airflow.api.auth.backend.default" allows all requests for historic reasons)
|
||||
auth_backend = airflow.api.auth.backend.deny_all
|
||||
auth_backends = airflow.api.auth.backend.session
|
||||
|
||||
# Used to set the maximum page limit for API requests
|
||||
maximum_page_limit = 100
|
||||
|
@ -789,23 +791,6 @@ fallback_page_limit = 100
|
|||
# access_control_allow_origin =
|
||||
|
||||
|
||||
# [smart_sensor]
|
||||
# TODO(hwoo) - Test smart sensors and enable this if the need arises.
|
||||
# When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to
|
||||
# smart sensor task.
|
||||
# use_smart_sensor = False
|
||||
|
||||
# `shard_code_upper_limit` is the upper limit of `shard_code` value. The `shard_code` is generated
|
||||
# by `hashcode % shard_code_upper_limit`.
|
||||
# shard_code_upper_limit = 10000
|
||||
|
||||
# The number of running smart sensor processes for each service.
|
||||
# shards = 5
|
||||
|
||||
# comma separated sensor classes support in smart_sensor.
|
||||
# sensors_enabled = NamedHivePartitionSensor
|
||||
|
||||
|
||||
[mesos]
|
||||
# Mesos master address which MesosExecutor will connect to.
|
||||
master = localhost:5050
|
||||
|
|
|
@ -77,7 +77,7 @@ function main() {
|
|||
|
||||
if [[ $num_errors -ne 0 && $TESTING -eq 0 ]]; then
|
||||
# Print full error output
|
||||
docker-compose exec web airflow dags list -v
|
||||
docker-compose exec web airflow dags list-import-errors
|
||||
echo "Failure!"
|
||||
exit 1
|
||||
elif [[ $TESTING -eq 1 ]]; then
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import gevent
|
||||
from gevent import monkey, pool
|
||||
from gevent import monkey
|
||||
|
||||
monkey.patch_all()
|
||||
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
import json
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from utils.dataproc import (
|
||||
moz_dataproc_pyspark_runner,
|
||||
copy_artifacts_dev,
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
import datetime
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.hooks.base_hook import BaseHook
|
||||
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
|
||||
from airflow.hooks.base import BaseHook
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
|
|
|
@ -1,15 +1,13 @@
|
|||
import ast
|
||||
import datetime
|
||||
from typing import Optional
|
||||
from enum import Enum
|
||||
|
||||
from airflow.decorators import dag
|
||||
from airflow.models import DagModel
|
||||
from airflow.models.param import Param
|
||||
from airflow.operators.bash import BashOperator
|
||||
from airflow.operators.dummy import DummyOperator
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from airflow.operators.python import PythonOperator, BranchPythonOperator
|
||||
from airflow.utils.trigger_rule import TriggerRule
|
||||
from airflow.models import DagModel
|
||||
|
||||
|
||||
from utils.backfill import BackfillParams
|
||||
from utils.tags import Tag
|
||||
|
@ -22,29 +20,18 @@ class TaskId(Enum):
|
|||
do_not_clear_tasks = "do_not_clear_tasks"
|
||||
|
||||
|
||||
def __parse_string_params(string_params: str) -> Optional[BackfillParams]:
|
||||
"""
|
||||
dag_run.conf is string representation of a Python dictionary in Airflow 2.1
|
||||
ast.literal_eval() is used to convert from string to dictionary as a workaround
|
||||
this workaround will no longer be required in Airflow >=2.2, see link below for future implementation
|
||||
https://airflow.apache.org/docs/apache-airflow/stable/concepts/params.html
|
||||
"""
|
||||
params_parsed = ast.literal_eval(string_params)
|
||||
return BackfillParams(**params_parsed)
|
||||
|
||||
|
||||
def dry_run_branch_callable(params: str) -> str:
|
||||
backfill_params = __parse_string_params(params)
|
||||
def dry_run_branch_callable(params: dict) -> str:
|
||||
backfill_params = BackfillParams(**params)
|
||||
return TaskId.dry_run.value if backfill_params.dry_run else TaskId.real_deal.value
|
||||
|
||||
|
||||
def clear_branch_callable(params: str) -> str:
|
||||
backfill_params = __parse_string_params(params)
|
||||
def clear_branch_callable(params: dict) -> str:
|
||||
backfill_params = BackfillParams(**params)
|
||||
return TaskId.clear_tasks.value if backfill_params.clear else TaskId.do_not_clear_tasks.value
|
||||
|
||||
|
||||
def param_validation(params: str) -> bool:
|
||||
backfill_params = __parse_string_params(params)
|
||||
def param_validation(params: dict) -> bool:
|
||||
backfill_params = BackfillParams(**params)
|
||||
backfill_params.validate_date_range()
|
||||
validate_dag_exists(dag_name=backfill_params.dag_name)
|
||||
backfill_params.validate_regex_pattern()
|
||||
|
@ -57,8 +44,8 @@ def validate_dag_exists(dag_name: str) -> None:
|
|||
raise ValueError(f"`dag_name`={dag_name} does not exist")
|
||||
|
||||
|
||||
def generate_bash_command(params: str) -> str:
|
||||
backfill_params = __parse_string_params(params)
|
||||
def generate_bash_command(params: dict) -> str:
|
||||
backfill_params = BackfillParams(**params)
|
||||
return " ".join(backfill_params.generate_backfill_command())
|
||||
|
||||
|
||||
|
@ -85,12 +72,17 @@ doc_md = """
|
|||
start_date=datetime.datetime(2022, 11, 1),
|
||||
dagrun_timeout=datetime.timedelta(days=1),
|
||||
tags=[Tag.ImpactTier.tier_3],
|
||||
params={"dag_name": "dag_name",
|
||||
"start_date": (datetime.date.today() - datetime.timedelta(days=10)).isoformat(),
|
||||
"end_date": datetime.date.today().isoformat(),
|
||||
"clear": False,
|
||||
"dry_run": True,
|
||||
"task_regex": None,
|
||||
render_template_as_native_obj=True,
|
||||
params={"dag_name": Param("dag_name", type="string"),
|
||||
"start_date": Param((datetime.date.today() - datetime.timedelta(days=10)).isoformat(),
|
||||
type="string",
|
||||
format="date-time"),
|
||||
"end_date": Param(datetime.date.today().isoformat(),
|
||||
type="string",
|
||||
format="date-time"),
|
||||
"clear": Param(False, type="boolean"),
|
||||
"dry_run": Param(True, type="boolean"),
|
||||
"task_regex": Param(None, type=["string", "null"]),
|
||||
}
|
||||
)
|
||||
def backfill_dag():
|
||||
|
@ -107,8 +99,8 @@ def backfill_dag():
|
|||
trigger_rule=TriggerRule.ONE_SUCCESS,
|
||||
)
|
||||
|
||||
dry_run_task = DummyOperator(task_id=TaskId.dry_run.value)
|
||||
real_deal_task = DummyOperator(task_id=TaskId.real_deal.value)
|
||||
dry_run_task = EmptyOperator(task_id=TaskId.dry_run.value)
|
||||
real_deal_task = EmptyOperator(task_id=TaskId.real_deal.value)
|
||||
|
||||
clear_branch_task = BranchPythonOperator(
|
||||
task_id="clear_parameter",
|
||||
|
@ -117,8 +109,8 @@ def backfill_dag():
|
|||
trigger_rule=TriggerRule.ONE_SUCCESS,
|
||||
)
|
||||
|
||||
clear_tasks_task = DummyOperator(task_id=TaskId.clear_tasks.value)
|
||||
do_not_clear_tasks_task = DummyOperator(task_id=TaskId.do_not_clear_tasks.value)
|
||||
clear_tasks_task = EmptyOperator(task_id=TaskId.clear_tasks.value)
|
||||
do_not_clear_tasks_task = EmptyOperator(task_id=TaskId.do_not_clear_tasks.value)
|
||||
|
||||
generate_backfill_command_task = PythonOperator(
|
||||
task_id="generate_backfill_command",
|
||||
|
|
|
@ -13,7 +13,7 @@ import datetime
|
|||
|
||||
from airflow import DAG
|
||||
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.utils.dates import days_ago
|
||||
from operators.backport.fivetran.operator import FivetranOperator
|
||||
from operators.backport.fivetran.sensor import FivetranSensor
|
||||
from utils.tags import Tag
|
||||
|
|
|
@ -6,8 +6,6 @@ from airflow.utils.task_group import TaskGroup
|
|||
from utils.gcp import (
|
||||
bigquery_etl_copy_deduplicate,
|
||||
bigquery_etl_query,
|
||||
gke_command,
|
||||
bigquery_xcom_query,
|
||||
)
|
||||
|
||||
from utils.gcp import gke_command
|
||||
|
|
|
@ -9,7 +9,7 @@ Uses crash report data imported from Socorro.
|
|||
import datetime
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
from datetime import datetime
|
||||
|
||||
from airflow import DAG
|
||||
from datetime import datetime, timedelta
|
||||
from airflow.models import Variable
|
||||
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.gcp import bigquery_etl_query, gke_command
|
||||
|
||||
DOCS = """\
|
||||
This DAG is related to data monitoring project it is still under development.
|
||||
|
|
|
@ -1,12 +1,10 @@
|
|||
from airflow import DAG
|
||||
from airflow.models import Variable
|
||||
from datetime import datetime, timedelta
|
||||
from utils.tags import Tag
|
||||
|
||||
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
|
||||
from airflow import DAG
|
||||
from airflow.operators.bash import BashOperator
|
||||
|
||||
from utils.slack import if_task_fails_alert_slack
|
||||
from utils.tags import Tag
|
||||
|
||||
"""
|
||||
If getting "channel_not_found" errors, you need to open the slack channel settings, navigate to Integrations,
|
||||
|
|
|
@ -6,13 +6,11 @@ This DAG exports views related to experiment monitoring to GCS as JSON
|
|||
every 5 minutes to power the Experimenter console.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from airflow import DAG
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from utils.gcp import bigquery_etl_query, gke_command
|
||||
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.gcp import gke_command
|
||||
from utils.tags import Tag
|
||||
|
||||
default_args = {
|
||||
|
@ -29,9 +27,7 @@ tags = [Tag.ImpactTier.tier_2]
|
|||
with DAG(
|
||||
'experiments_live',
|
||||
default_args=default_args,
|
||||
# Will be renamed to max_active_tasks sometime later as main upstream branch states
|
||||
# max_active_tasks=4,
|
||||
concurrency=4,
|
||||
max_active_tasks=4,
|
||||
max_active_runs=1,
|
||||
schedule_interval="*/5 * * * *",
|
||||
doc_md=__doc__,
|
||||
|
|
|
@ -8,7 +8,7 @@ Source code is in the [firefox-public-data-report-etl repository]
|
|||
from airflow import DAG
|
||||
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from datetime import datetime, timedelta
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
|
||||
|
|
|
@ -2,11 +2,9 @@ from datetime import datetime, timedelta
|
|||
from typing import Any, Dict
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.dummy_operator import DummyOperator
|
||||
from airflow.operators.python_operator import PythonOperator
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from airflow.operators.python import PythonOperator
|
||||
from airflow.hooks.base import BaseHook
|
||||
from airflow.sensors.external_task import ExternalTaskMarker
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
from operators.backport.fivetran.operator import FivetranOperator
|
||||
from operators.backport.fivetran.sensor import FivetranSensor
|
||||
from utils.tags import Tag
|
||||
|
@ -200,7 +198,7 @@ for report_type, _config in REPORTS_CONFIG.items():
|
|||
poke_interval=30
|
||||
)
|
||||
|
||||
load_completed = DummyOperator(
|
||||
load_completed = EmptyOperator(
|
||||
task_id='fivetran_load_completed',
|
||||
)
|
||||
|
||||
|
|
|
@ -11,9 +11,7 @@ in telemetry-airflow.
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from operators.gcp_container_operator import GKENatPodOperator
|
||||
from airflow.models import Variable
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from airflow.sensors.external_task import ExternalTaskMarker
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
|
|
@ -13,12 +13,9 @@ in telemetry-airflow.
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from operators.gcp_container_operator import GKENatPodOperator
|
||||
from operators.task_sensor import ExternalTaskCompletedSensor
|
||||
from airflow.models import Variable
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
|
||||
from glam_subdags.extract import extracts_subdag, extract_user_counts
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
|
||||
from glam_subdags.histograms import histogram_aggregates_subdag
|
||||
from glam_subdags.general import repeated_subdag
|
||||
from glam_subdags.generate_query import generate_and_run_desktop_query
|
||||
|
@ -60,7 +57,7 @@ dag = DAG(
|
|||
|
||||
""" This isn't needed because the dev dag will only be triggered manually
|
||||
# Make sure all the data for the given day has arrived before running.
|
||||
wait_for_main_ping = ExternalTaskCompletedSensor(
|
||||
wait_for_main_ping = ExternalTaskSensor(
|
||||
task_id="wait_for_main_ping",
|
||||
external_dag_id="copy_deduplicate",
|
||||
external_task_id="copy_deduplicate_main_ping",
|
||||
|
|
|
@ -10,7 +10,7 @@ in telemetry-airflow.
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.dummy_operator import DummyOperator
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from airflow.sensors.external_task import ExternalTaskMarker
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
@ -92,7 +92,7 @@ with DAG(
|
|||
email_on_retry=False,
|
||||
)
|
||||
|
||||
pre_import = DummyOperator(
|
||||
pre_import = EmptyOperator(
|
||||
task_id='pre_import',
|
||||
)
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.dummy_operator import DummyOperator
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from airflow.sensors.external_task import ExternalTaskMarker
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
@ -75,7 +75,7 @@ with DAG(
|
|||
email_on_retry=False,
|
||||
)
|
||||
|
||||
pre_import = DummyOperator(
|
||||
pre_import = EmptyOperator(
|
||||
task_id=f'pre_import',
|
||||
)
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ import datetime
|
|||
import os
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.dummy import DummyOperator
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from operators.backport.fivetran.operator import FivetranOperator
|
||||
from operators.backport.fivetran.sensor import FivetranSensor
|
||||
from utils.tags import Tag
|
||||
|
@ -59,13 +59,13 @@ with DAG(
|
|||
tags=tags,
|
||||
) as dag:
|
||||
|
||||
fivetran_sensors_complete = DummyOperator(
|
||||
fivetran_sensors_complete = EmptyOperator(
|
||||
task_id='intacct-fivetran-sensors-complete',
|
||||
)
|
||||
|
||||
for index, (location, connector_id) in enumerate(list_of_connectors.items()):
|
||||
|
||||
fivetran_sync = DummyOperator(task_id=f'intacct-{location}')
|
||||
fivetran_sync = EmptyOperator(task_id=f'intacct-{location}')
|
||||
|
||||
# In order to avoid hitting DAG concurrency limits by sensor tasks below,
|
||||
# sync tasks here have variable priority weights
|
||||
|
|
10
dags/ltv.py
10
dags/ltv.py
|
@ -6,19 +6,11 @@ Kicks off jobs to run on a Dataproc cluster. The job code lives in
|
|||
|
||||
See [client_ltv docs on DTMO](https://docs.telemetry.mozilla.org/datasets/search/client_ltv/reference.html).
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow.providers.google.cloud.operators.bigquery import (
|
||||
BigQueryExecuteQueryOperator
|
||||
)
|
||||
from six.moves.urllib.request import urlopen
|
||||
from utils.dataproc import (
|
||||
moz_dataproc_pyspark_runner,
|
||||
copy_artifacts_dev,
|
||||
|
|
|
@ -4,14 +4,12 @@ Aggregates that power the legacy telemetry
|
|||
|
||||
See [python_mozaggregator](https://github.com/mozilla/python_mozaggregator).
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
|
||||
|
||||
from utils.dataproc import copy_artifacts_dev, moz_dataproc_pyspark_runner
|
||||
|
|
|
@ -5,13 +5,12 @@ Aggregates that power the legacy telemetry
|
|||
See [python_mozaggregator](https://github.com/mozilla/python_mozaggregator).
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from utils.dataproc import moz_dataproc_pyspark_runner, copy_artifacts_dev
|
||||
from utils.gcp import gke_command
|
||||
from utils.tags import Tag
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
from datetime import timedelta, datetime
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from datetime import timedelta, datetime
|
||||
from utils.gcp import gke_command
|
||||
from utils.tags import Tag
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.tags import Tag
|
||||
|
||||
docs = """
|
||||
### operational_monitoring
|
||||
|
|
|
@ -12,6 +12,10 @@ class RegistryLink(BaseOperatorLink):
|
|||
|
||||
name = "Astronomer Registry"
|
||||
|
||||
# Note that this may break, get_link no longer supports dttm as an arg in v2.3
|
||||
# Instead it expects a ti_key. Looks like fivetran hasn't released a
|
||||
# new airflow-provider-fivetran>1.1.2 yet.
|
||||
# More details at https://github.com/apache/airflow/pull/21798
|
||||
def get_link(self, operator, dttm):
|
||||
"""Get link to registry page."""
|
||||
|
||||
|
|
|
@ -1,122 +0,0 @@
|
|||
import datetime
|
||||
import os
|
||||
|
||||
from sqlalchemy import func
|
||||
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from airflow.utils.db import provide_session
|
||||
from airflow.utils.state import State
|
||||
|
||||
|
||||
"""
|
||||
Custom ExternalTaskSensor implementation that also checks for failed states.
|
||||
Once we update to Airflow 2, this implementation can be deprecated in favour of ExternalTaskSensor.
|
||||
|
||||
Based on https://github.com/apache/airflow/blob/v1-10-stable/airflow/sensors/external_task_sensor.py
|
||||
"""
|
||||
|
||||
class ExternalTaskCompletedSensor(ExternalTaskSensor):
|
||||
"""
|
||||
We override __init__ and poke methods to support:
|
||||
- Checking external upstream tasks for failed states (e.g. State.FAILED, State.UPSTREAM_FAILED, ...)
|
||||
to stop the ExternalTaskSensor from being rescheduled indefinitely.
|
||||
- A new parameter `failed_states` has been added to define the states which indicate that the
|
||||
external task has failed.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, failed_states = None, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.failed_states = failed_states or [State.FAILED, State.UPSTREAM_FAILED, State.SKIPPED]
|
||||
|
||||
@provide_session
|
||||
def poke(self, context, session=None):
|
||||
# implementation copied from https://github.com/apache/airflow/blob/v1-10-stable/airflow/sensors/external_task_sensor.py
|
||||
|
||||
if self.execution_delta:
|
||||
dttm = context['execution_date'] - self.execution_delta
|
||||
elif self.execution_date_fn:
|
||||
# Moz specific - _handle_execution_date_fn may not be defined in this context
|
||||
raise AirflowException("execution_date_fn is not supported by this custom mozilla sensor.")
|
||||
else:
|
||||
dttm = context['execution_date']
|
||||
|
||||
dttm_filter = dttm if isinstance(dttm, list) else [dttm]
|
||||
serialized_dttm_filter = ','.join(
|
||||
[datetime.isoformat() for datetime in dttm_filter])
|
||||
|
||||
self.log.info(
|
||||
'Poking for %s.%s on %s ... ',
|
||||
self.external_dag_id, self.external_task_id, serialized_dttm_filter
|
||||
)
|
||||
|
||||
DM = DagModel
|
||||
TI = TaskInstance
|
||||
DR = DagRun
|
||||
if self.check_existence:
|
||||
dag_to_wait = session.query(DM).filter(
|
||||
DM.dag_id == self.external_dag_id
|
||||
).first()
|
||||
|
||||
if not dag_to_wait:
|
||||
raise AirflowException('The external DAG '
|
||||
'{} does not exist.'.format(self.external_dag_id))
|
||||
else:
|
||||
if not os.path.exists(dag_to_wait.fileloc):
|
||||
raise AirflowException('The external DAG '
|
||||
'{} was deleted.'.format(self.external_dag_id))
|
||||
|
||||
if self.external_task_id:
|
||||
refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
|
||||
if not refreshed_dag_info.has_task(self.external_task_id):
|
||||
raise AirflowException('The external task'
|
||||
'{} in DAG {} does not exist.'.format(self.external_task_id,
|
||||
self.external_dag_id))
|
||||
|
||||
# custom implementation to check for failed tasks
|
||||
if self.external_task_id:
|
||||
# Moz specific - rename count to count_allowed
|
||||
# .count() is inefficient
|
||||
count_allowed = session.query(func.count()).filter(
|
||||
TI.dag_id == self.external_dag_id,
|
||||
TI.task_id == self.external_task_id,
|
||||
TI.state.in_(self.allowed_states),
|
||||
TI.execution_date.in_(dttm_filter),
|
||||
).scalar()
|
||||
|
||||
# Moz specific - counting failed upstream states
|
||||
count_failed = session.query(func.count()).filter(
|
||||
TI.dag_id == self.external_dag_id,
|
||||
TI.task_id == self.external_task_id,
|
||||
TI.state.in_(self.failed_states),
|
||||
TI.execution_date.in_(dttm_filter),
|
||||
).scalar()
|
||||
else:
|
||||
# Moz specific - rename count to count_allowed
|
||||
# .count() is inefficient
|
||||
count_allowed = session.query(func.count()).filter(
|
||||
DR.dag_id == self.external_dag_id,
|
||||
DR.state.in_(self.allowed_states),
|
||||
DR.execution_date.in_(dttm_filter),
|
||||
).scalar()
|
||||
|
||||
# Moz specific - counting failed upstream states
|
||||
count_failed = session.query(func.count()).filter(
|
||||
DR.dag_id == self.external_dag_id,
|
||||
DR.state.in_(self.failed_states),
|
||||
DR.execution_date.in_(dttm_filter),
|
||||
).scalar()
|
||||
|
||||
# Moz specific - set sensor to failed state if external task has faileds
|
||||
if count_failed == len(dttm_filter):
|
||||
if self.external_task_id:
|
||||
raise AirflowException(
|
||||
f'The external task {self.external_task_id} in DAG {self.external_dag_id} failed.'
|
||||
)
|
||||
else:
|
||||
raise AirflowException(f'The external DAG {self.external_dag_id} failed.')
|
||||
|
||||
session.commit()
|
||||
return count_allowed == len(dttm_filter)
|
|
@ -8,17 +8,14 @@ which would allow us to tear down this DAG.
|
|||
|
||||
from airflow import DAG
|
||||
from datetime import datetime, timedelta
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from airflow.sensors.external_task import ExternalTaskMarker
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.gcp import (
|
||||
bigquery_etl_query,
|
||||
bigquery_etl_copy_deduplicate,
|
||||
export_to_parquet,
|
||||
gke_command,
|
||||
)
|
||||
from utils.tags import Tag
|
||||
|
||||
|
|
|
@ -4,10 +4,10 @@ from datetime import datetime, timedelta
|
|||
from airflow import DAG
|
||||
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
|
||||
from airflow.models import Variable
|
||||
from airflow.operators.dummy import DummyOperator
|
||||
from airflow.operators.http_operator import SimpleHttpOperator
|
||||
from airflow.operators.python_operator import PythonOperator
|
||||
from airflow.operators.branch_operator import BaseBranchOperator
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from airflow.providers.http.operators.http import SimpleHttpOperator
|
||||
from airflow.operators.python import PythonOperator
|
||||
from airflow.operators.branch import BaseBranchOperator
|
||||
from airflow.utils.weekday import WeekDay
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from utils.tags import Tag
|
||||
|
@ -81,7 +81,7 @@ with DAG('probe_scraper',
|
|||
|
||||
# probe scraper used to be a single task, but it has beeen split up, and individual
|
||||
# failures do not block downstream tasks
|
||||
probe_scraper = DummyOperator(
|
||||
probe_scraper = EmptyOperator(
|
||||
task_id="probe_scraper",
|
||||
trigger_rule="all_done",
|
||||
dag=dag,
|
||||
|
@ -243,7 +243,7 @@ with DAG('probe_scraper',
|
|||
)
|
||||
for check_name in ("check-expiry", "check-fog-expiry")
|
||||
]
|
||||
dummy_branch = DummyOperator(
|
||||
dummy_branch = EmptyOperator(
|
||||
task_id="dummy_branch",
|
||||
dag=dag,
|
||||
)
|
||||
|
|
|
@ -12,9 +12,8 @@ This DAG is low priority.
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from airflow import DAG
|
||||
|
||||
from utils.gcp import gke_command
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
from utils.tags import Tag
|
||||
|
||||
default_args = {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from airflow import DAG
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
|
||||
from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import (
|
||||
CloudDataTransferServiceS3ToGCSOperator,
|
||||
|
|
|
@ -12,9 +12,8 @@ from datetime import datetime, timedelta
|
|||
from airflow import DAG
|
||||
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
|
||||
from airflow.sensors.external_task import ExternalTaskSensor
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from airflow.models import Variable
|
||||
from itertools import chain
|
||||
|
||||
from operators.gcp_container_operator import GKEPodOperator # noqa
|
||||
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
||||
|
|
|
@ -6,7 +6,7 @@ For context, see https://github.com/mozilla/taar
|
|||
|
||||
from airflow import DAG
|
||||
from datetime import datetime, timedelta
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from airflow.models import Variable
|
||||
|
||||
from operators.gcp_container_operator import GKEPodOperator # noqa
|
||||
|
|
|
@ -6,7 +6,7 @@ See [jobs/update_orphaning_dashboard_etl.py](https://github.com/mozilla/telemetr
|
|||
|
||||
from airflow import DAG
|
||||
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from airflow.operators.subdag import SubDagOperator
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from utils.constants import DS_WEEKLY
|
||||
|
|
|
@ -4,7 +4,7 @@ from collections import namedtuple
|
|||
|
||||
from airflow import models
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.operators.bash_operator import BashOperator
|
||||
from airflow.operators.bash import BashOperator
|
||||
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
|
||||
|
||||
# When google deprecates dataproc_v1beta2 in DataprocHook/Operator classes
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
from airflow import models
|
||||
from airflow.operators.dummy_operator import DummyOperator
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
|
||||
from operators.gcp_container_operator import GKEPodOperator
|
||||
from dags.operators.gcp_container_operator import GKEPodOperator
|
||||
|
||||
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
|
||||
|
||||
|
@ -17,7 +15,7 @@ from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToG
|
|||
|
||||
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
|
||||
|
||||
from utils.dataproc import get_dataproc_parameters
|
||||
from dags.utils.dataproc import get_dataproc_parameters
|
||||
|
||||
import json
|
||||
import re
|
||||
|
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -526,7 +526,7 @@ def main(
|
|||
)
|
||||
|
||||
if num_donors < 100:
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"Less than 100 donors were requested.", extra={"donors": num_donors}
|
||||
)
|
||||
num_donors = 100
|
||||
|
|
|
@ -16,7 +16,7 @@ telemetry_airflow = {
|
|||
wtmo_dev = {
|
||||
"name": "WTMO Developer Guide",
|
||||
"category": "Mozilla",
|
||||
"href": "https://mana.mozilla.org/wiki/display/DOPS/WTMO+Developer+Guide"
|
||||
"href": "https://mozilla-hub.atlassian.net/wiki/spaces/SRE/pages/27922811/WTMO+Developer+Guide"
|
||||
}
|
||||
|
||||
airflow_triage_guide = {
|
||||
|
|
|
@ -18,16 +18,17 @@
|
|||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
from airflow import configuration as conf
|
||||
from flask_appbuilder.security.manager import AUTH_DB
|
||||
# from flask_appbuilder.security.manager import AUTH_LDAP
|
||||
from flask_appbuilder.security.manager import AUTH_OAUTH
|
||||
|
||||
# from flask_appbuilder.security.manager import AUTH_OID
|
||||
# from flask_appbuilder.security.manager import AUTH_REMOTE_USER
|
||||
basedir = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
# The SQLAlchemy connection string.
|
||||
SQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN')
|
||||
SQLALCHEMY_DATABASE_URI = conf.conf.get('core', 'SQL_ALCHEMY_CONN')
|
||||
|
||||
# Flask-WTF flag for CSRF
|
||||
CSRF_ENABLED = True
|
||||
|
|
Загрузка…
Ссылка в новой задаче