from datetime import datetime, timedelta from airflow import DAG from timetable import MultiWeekTimetable from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag docs = """ ### shredder #### Description These jobs normally need to be restarted many times, because each query is only attempted once per run. `main_v4` and `main_summary_v4` in particular have partitions that fail often due to a combination of size, schema, and clustering. In most cases failed jobs may simply be restarted. Logs from failed runs are not available in airflow, because Kubernetes Pods are deleted on exit. Instead, logs can be found in Google Cloud Logging: - [shredder-flat-rate-main-summary](https://cloudlogging.app.goo.gl/Tv68VKpCR9fzbJNGA) - [shredder-flat-rate](https://cloudlogging.app.goo.gl/Uu6VRn34VY4AryGJ9) - [on-demand](https://cloudlogging.app.goo.gl/GX1GM9hwZMENNnnq8) Kubernetes Pods are deleted on exit to prevent multiple running instances. Multiple running instances will submit redundant queries, because state is only read at the start of each run. This may cause queries to timeout because only a few may be run in parallel while the rest are queued. #### Owner akomar@mozilla.com """ default_args = { "owner": "akomar@mozilla.com", "depends_on_past": True, "start_date": datetime(2023, 5, 16), "catchup": False, "email": [ "telemetry-alerts@mozilla.com", "akomar@mozilla.com", "bewu@mozilla.com", ], "email_on_failure": True, "email_on_retry": False, "retries": 44, "retry_delay": timedelta(minutes=5), } tags = [ Tag.ImpactTier.tier_2, Tag.Triage.no_triage, ] dag = DAG( "shredder", default_args=default_args, # 4 week intervals from start_date. This is similar to # schedule_interval=timedelta(days=28), except it should actually work. schedule=MultiWeekTimetable(num_weeks=4), doc_md=docs, tags=tags, ) docker_image = "gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest" base_command = [ "script/shredder_delete", "--state-table=moz-fx-data-shredder.shredder_state.shredder_state", "--task-table=moz-fx-data-shredder.shredder_state.tasks", # dags run one schedule interval after ds, end date should be one day before the dag # runs, and schedule intervals are 4 weeks = 28 days, so 28-1 = 27 days after ds "--end-date={{macros.ds_add(ds, 27)}}", # start date should be two schedule intervals before end date, to avoid # race conditions with downstream tables and pings received shortly after a # deletion request. schedule intervals are 4 weeks = 28 days. # This is temporarily increased to 4 intervals, in order handle outstanding backlog "--start-date={{macros.ds_add(ds, 27-28*4)}}", # non-dml statements use LEFT JOIN instead of IN to filter rows, which takes about # half as long as of 2022-02-14, and reduces cost by using less flat rate slot time "--no-use-dml", ] # handle telemetry main and main use counter separately to ensure they run continuously # and don't slow down other tables. run them in a separate project with their own slot # reservation to ensure they can finish on time, because they use more slots than # everything else combined telemetry_main = GKEPodOperator( task_id="telemetry_main", name="shredder-telemetry-main", arguments=[ *base_command, "--parallelism=2", "--billing-project=moz-fx-data-shredder", "--only=telemetry_stable.main_v5", ], image=docker_image, is_delete_operator_pod=True, reattach_on_restart=True, dag=dag, ) telemetry_main_use_counter = GKEPodOperator( task_id="telemetry_main_use_counter", name="shredder-telemetry-main-use-counter", arguments=[ *base_command, "--parallelism=2", "--billing-project=moz-fx-data-shredder", "--only=telemetry_stable.main_use_counter_v4", ], image=docker_image, is_delete_operator_pod=True, reattach_on_restart=True, dag=dag, ) # everything else flat_rate = GKEPodOperator( task_id="all", name="shredder-all", arguments=[ *base_command, "--parallelism=4", "--billing-project=moz-fx-data-bq-batch-prod", "--except", "telemetry_stable.main_v5", "telemetry_stable.main_use_counter_v4", ], image=docker_image, is_delete_operator_pod=True, reattach_on_restart=True, # Needed to scale the highmem pool from 0 -> 1, because cluster autoscaling # works on pod resource requests, instead of usage container_resources={ "request_memory": "13312Mi", "request_cpu": None, "limit_memory": "20480Mi", "limit_cpu": None, "limit_gpu": None, }, # This job was being killed by Kubernetes for using too much memory, thus the highmem node pool node_selector={"nodepool": "highmem"}, # Give additional time since we may need to scale up when running this job startup_timeout_seconds=360, dag=dag, ) experiments = GKEPodOperator( task_id="experiments", name="shredder-experiments", arguments=[ *base_command, "--parallelism=6", "--billing-project=moz-fx-data-bq-batch-prod", "--environment=experiments", ], image=docker_image, is_delete_operator_pod=True, reattach_on_restart=True, dag=dag, )