fix: disable catch up and change start dates. (#1712)

This commit is contained in:
Mikaël Ducharme 2023-05-26 11:20:59 -04:00 коммит произвёл GitHub
Родитель e9b2a914a2
Коммит d2fbb73616
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 51 добавлений и 44 удалений

Просмотреть файл

@ -1,10 +1,10 @@
from datetime import datetime
from airflow import DAG
from utils.tags import Tag
from operators.gcp_container_operator import GKEPodOperator
from dags.utils.tags import Tag
DOCS = """\
This DAG is related to data monitoring project it is still under development.
All alerts related to this DAG can be ignored.
@ -39,7 +39,7 @@ TARGET_DATASETS = (
default_args = {
"owner": "akommasani@mozilla.com",
"start_date": datetime(2022, 11, 1),
"start_date": datetime(2023, 5, 26),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
@ -75,10 +75,9 @@ with DAG(
f"--table={table}",
"--date={{ macros.ds_add(ds, -1) }}",
],
env_vars=dict(
SLACK_BOT_TOKEN="{{ var.value.dim_slack_secret_token }}"),
gcp_conn_id='google_cloud_airflow_gke',
project_id='moz-fx-data-airflow-gke-prod',
cluster_name='workloads-prod-v1',
location='us-west1',
env_vars={"SLACK_BOT_TOKEN": "{{ var.value.dim_slack_secret_token }}"},
gcp_conn_id="google_cloud_airflow_gke",
project_id="moz-fx-data-airflow-gke-prod",
cluster_name="workloads-prod-v1",
location="us-west1",
)

Просмотреть файл

@ -2,20 +2,21 @@ from datetime import datetime, timedelta
from typing import Any, Dict
from airflow import DAG
from airflow.hooks.base import BaseHook
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.hooks.base import BaseHook
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
from utils.callbacks import retry_tasks_callback
from utils.tags import Tag
from utils.acoustic.acoustic_client import AcousticClient
from dags.utils.acoustic.acoustic_client import AcousticClient
from dags.utils.callbacks import retry_tasks_callback
from dags.utils.tags import Tag
def _generate_acoustic_report(conn_id: str, report_type: str, config: Dict[Any, Any], *args, **kwargs):
"""
A wrapper function for retrieving Acoustic connection details from Airflow instantiating AcousticClient and generating report.
"""
def _generate_acoustic_report(
conn_id: str, report_type: str, config: Dict[Any, Any], *args, **kwargs
):
"""Retrieve Acoustic connection details from Airflow, instantiate AcousticClient and generate report."""
if config["request_params"]["date_start"] == config["request_params"]["date_end"]:
err_msg = "It appears start and end date are exactly the same. This is undesired and will result in data being generated for 0 second time range."
@ -31,7 +32,11 @@ def _generate_acoustic_report(conn_id: str, report_type: str, config: Dict[Any,
}
acoustic_client = AcousticClient(**acoustic_connection)
acoustic_client.generate_report(request_template=config["request_template"], template_params=config["request_params"], report_type=report_type)
acoustic_client.generate_report(
request_template=config["request_template"],
template_params=config["request_params"],
report_type=report_type,
)
return
@ -120,8 +125,8 @@ REPORTS_CONFIG = {
"date_end": EXEC_END,
},
},
"contact_export": {
"request_template": """
"contact_export": {
"request_template": """
<!-- https://developer.goacoustic.com/acoustic-campaign/reference/export-from-a-database -->
<!-- date_format: 07/25/2011 12:12:11 (time is optional) -->
<Envelope>
@ -141,18 +146,18 @@ REPORTS_CONFIG = {
</Body>
</Envelope>
""",
"request_params": {
"list_id": "{{ var.value.fivetran_acoustic_contact_export_list_id }}", # list_name: "Main Contact Table revision 3"
"export_type": "ALL",
"export_format": "CSV",
"visibility": 1, # 0 (Private) or 1 (Shared)
"date_start": EXEC_START,
"date_end": EXEC_END,
"columns": "\n".join([
f"<COLUMN>{column}</COLUMN>" for column in CONTACT_COLUMNS
])
"request_params": {
"list_id": "{{ var.value.fivetran_acoustic_contact_export_list_id }}", # list_name: "Main Contact Table revision 3"
"export_type": "ALL",
"export_format": "CSV",
"visibility": 1, # 0 (Private) or 1 (Shared)
"date_start": EXEC_START,
"date_end": EXEC_END,
"columns": "\n".join(
[f"<COLUMN>{column}</COLUMN>" for column in CONTACT_COLUMNS]
),
},
},
},
}
@ -160,7 +165,7 @@ DEFAULT_ARGS = {
"owner": DAG_OWNER,
"email": [DAG_OWNER],
"depends_on_past": True,
"start_date": datetime(2021, 3, 1),
"start_date": datetime(2023, 5, 26),
"email_on_failure": True,
"email_on_retry": False,
"retries": 1, # at this point we can probably be confident user intervention is required
@ -170,7 +175,7 @@ DEFAULT_ARGS = {
TAGS = [Tag.ImpactTier.tier_1]
for report_type, _config in REPORTS_CONFIG.items():
dag_id = f'fivetran_acoustic_{report_type}'
dag_id = f"fivetran_acoustic_{report_type}"
with DAG(
dag_id=dag_id,
@ -189,21 +194,21 @@ for report_type, _config in REPORTS_CONFIG.items():
)
sync_trigger = FivetranOperator(
task_id='trigger_fivetran_connector',
task_id="trigger_fivetran_connector",
connector_id=f"{{{{ var.value.fivetran_acoustic_{report_type}_connector_id }}}}",
)
sync_wait = FivetranSensor(
task_id='wait_for_fivetran_connector_completion',
task_id="wait_for_fivetran_connector_completion",
connector_id=f"{{{{ var.value.fivetran_acoustic_{report_type}_connector_id }}}}",
poke_interval=30,
xcom="{{ task_instance.xcom_pull('trigger_fivetran_connector') }}",
on_retry_callback=retry_tasks_callback,
params={'retry_tasks': ['trigger_fivetran_connector']},
params={"retry_tasks": ["trigger_fivetran_connector"]},
)
load_completed = EmptyOperator(
task_id='fivetran_load_completed',
task_id="fivetran_load_completed",
)
generate_report >> sync_trigger >> sync_wait >> load_completed

Просмотреть файл

@ -1,6 +1,7 @@
"""
Overwatch
Runs daily at 0700 UTC
Overwatch.
Runs daily at 0700 UTC.
Source code is [overwatch-mvp repository](https://github.com/mozilla/overwatch-mvp/).
@ -14,7 +15,6 @@ from datetime import datetime
from airflow import DAG
from operators.gcp_container_operator import GKEPodOperator
default_args = {
"owner": "gleonard@mozilla.com",
"email": [
@ -27,7 +27,10 @@ default_args = {
"retries": 0,
}
tags = ["repo/telemetry-airflow", "impact/tier_3", ]
tags = [
"repo/telemetry-airflow",
"impact/tier_3",
]
image = "gcr.io/moz-fx-data-airflow-prod-88e0/overwatch:{{ var.value.overwatch_image_version }}"
@ -37,7 +40,7 @@ with DAG(
schedule_interval="0 7 * * *",
doc_md=__doc__,
tags=tags,
catchup=True,
catchup=False,
) as dag:
run_analysis = GKEPodOperator(
task_id="run_analysis",

Просмотреть файл

@ -14,7 +14,7 @@ airflow-provider-fivetran==1.1.2
pytest==6.2.5
pytest-mock==3.10.0
black==22.6.0
ruff==0.0.252
ruff==0.0.269
# Misc
mozlogging

Просмотреть файл

@ -1110,7 +1110,7 @@ rsa==4.8
# via
# -c ./constraints.txt
# google-auth
ruff==0.0.252
ruff==0.0.269
# via -r requirements.in
s3transfer==0.6.0
# via