251 строка
7.5 KiB
Python
251 строка
7.5 KiB
Python
from datetime import datetime, timedelta
|
|
from typing import Any
|
|
|
|
from airflow import DAG
|
|
from airflow.hooks.base import BaseHook
|
|
from airflow.operators.empty import EmptyOperator
|
|
from airflow.operators.python import PythonOperator
|
|
from fivetran_provider_async.operators import FivetranOperator
|
|
|
|
from utils.acoustic.acoustic_client import AcousticClient
|
|
from utils.tags import Tag
|
|
|
|
|
|
def _generate_acoustic_report(
|
|
conn_id: str, report_type: str, config: dict[Any, Any], *args, **kwargs
|
|
):
|
|
"""Retrieve Acoustic connection details from Airflow, instantiate AcousticClient and generate report."""
|
|
|
|
if config["request_params"]["date_start"] == config["request_params"]["date_end"]:
|
|
err_msg = "It appears start and end date are exactly the same. This is undesired and will result in data being generated for 0 second time range."
|
|
raise ValueError(err_msg)
|
|
|
|
conn = BaseHook.get_connection(conn_id)
|
|
|
|
acoustic_connection = {
|
|
"base_url": conn.host,
|
|
"client_id": conn.login,
|
|
"client_secret": conn.password,
|
|
"refresh_token": conn.extra_dejson["refresh_token"],
|
|
}
|
|
|
|
acoustic_client = AcousticClient(**acoustic_connection)
|
|
acoustic_client.generate_report(
|
|
request_template=config["request_template"],
|
|
template_params=config["request_params"],
|
|
report_type=report_type,
|
|
)
|
|
|
|
return
|
|
|
|
|
|
DOCS = """
|
|
### fivetran_acoustic
|
|
|
|
#### Description
|
|
|
|
DAG triggers acoustic report generation, then corresponding Fivetran connector to consume the generated report
|
|
from Acoustic SFTP server and waits until its completion.
|
|
|
|
#### Owner
|
|
|
|
kik@mozilla.com
|
|
"""
|
|
|
|
DAG_OWNER = "kik@mozilla.com"
|
|
|
|
ACOUSTIC_CONNECTION_ID = "acoustic"
|
|
|
|
EXEC_START = '{{ macros.ds_format(ds, "%Y-%m-%d", "%m/%d/%Y %H:%M:%S") }}'
|
|
EXEC_END = '{{ macros.ds_format(next_ds, "%Y-%m-%d", "%m/%d/%Y %H:%M:%S") }}'
|
|
|
|
CONTACT_COLUMNS = [
|
|
"email",
|
|
"basket_token",
|
|
"sfdc_id",
|
|
"double_opt_in",
|
|
"has_opted_out_of_email",
|
|
"email_format",
|
|
"email_lang",
|
|
"fxa_created_date",
|
|
"fxa_first_service",
|
|
"fxa_id",
|
|
"fxa_account_deleted",
|
|
"email_id",
|
|
"mailing_country",
|
|
"cohort",
|
|
"sub_mozilla_foundation",
|
|
"sub_common_voice",
|
|
"sub_hubs",
|
|
"sub_mixed_reality",
|
|
"sub_internet_health_report",
|
|
"sub_miti",
|
|
"sub_mozilla_fellowship_awardee_alumni",
|
|
"sub_mozilla_festival",
|
|
"sub_mozilla_technology",
|
|
"sub_mozillians_nda",
|
|
"sub_firefox_accounts_journey",
|
|
"sub_knowledge_is_power",
|
|
"sub_take_action_for_the_internet",
|
|
"sub_test_pilot",
|
|
"sub_firefox_news",
|
|
"vpn_waitlist_geo",
|
|
"vpn_waitlist_platform",
|
|
"sub_about_mozilla",
|
|
"sub_apps_and_hacks",
|
|
"sub_rally",
|
|
"sub_firefox_sweepstakes",
|
|
"relay_waitlist_geo",
|
|
"RECIPIENT_ID",
|
|
"create_timestamp",
|
|
"Last Modified Date",
|
|
]
|
|
|
|
SUPPRESSION_COLUMNS = [
|
|
"Email",
|
|
"Opt In Date",
|
|
"Opted Out",
|
|
"Opt In Details",
|
|
"Email Type",
|
|
"Opted Out Date",
|
|
"Opt Out Details",
|
|
]
|
|
|
|
REPORTS_CONFIG = {
|
|
"raw_recipient_export": {
|
|
"request_template": """
|
|
<Envelope>
|
|
<Body>
|
|
<RawRecipientDataExport>
|
|
<EVENT_DATE_START>{date_start}</EVENT_DATE_START>
|
|
<EVENT_DATE_END>{date_end}</EVENT_DATE_END>
|
|
<MOVE_TO_FTP/>
|
|
<EXPORT_FORMAT>{export_format}</EXPORT_FORMAT>
|
|
<ALL_EVENT_TYPES/>
|
|
<INCLUDE_INBOX_MONITORING/>
|
|
</RawRecipientDataExport>
|
|
</Body>
|
|
</Envelope>
|
|
""",
|
|
"request_params": {
|
|
"export_format": 0,
|
|
"date_start": EXEC_START,
|
|
"date_end": EXEC_END,
|
|
},
|
|
},
|
|
"contact_export": {
|
|
"request_template": """
|
|
<!-- https://developer.goacoustic.com/acoustic-campaign/reference/export-from-a-database -->
|
|
<!-- date_format: 07/25/2011 12:12:11 (time is optional) -->
|
|
<Envelope>
|
|
<Body>
|
|
<ExportList>
|
|
<LIST_ID>{list_id}</LIST_ID>
|
|
<EXPORT_TYPE>{export_type}</EXPORT_TYPE>
|
|
<EXPORT_FORMAT>{export_format}</EXPORT_FORMAT>
|
|
<INCLUDE_RECIPIENT_ID></INCLUDE_RECIPIENT_ID>
|
|
<EXPORT_COLUMNS>
|
|
{columns}
|
|
</EXPORT_COLUMNS>
|
|
<DATE_START>{date_start}</DATE_START>
|
|
<DATE_END>{date_end}</DATE_END>
|
|
<VISIBILITY>{visibility}</VISIBILITY>
|
|
</ExportList>
|
|
</Body>
|
|
</Envelope>
|
|
""",
|
|
"request_params": {
|
|
"list_id": "{{ var.value.fivetran_acoustic_contact_export_list_id }}", # list_name: "Main Contact Table revision 3"
|
|
"export_type": "ALL",
|
|
"export_format": "CSV",
|
|
"visibility": 1, # 0 (Private) or 1 (Shared)
|
|
"date_start": EXEC_START,
|
|
"date_end": EXEC_END,
|
|
"columns": "\n".join(
|
|
[f"<COLUMN>{column}</COLUMN>" for column in CONTACT_COLUMNS]
|
|
),
|
|
},
|
|
},
|
|
"suppression_export": {
|
|
"request_template": """
|
|
<!-- https://developer.goacoustic.com/acoustic-campaign/reference/export-from-a-database -->
|
|
<!-- date_format: 2011-12-20 12:12 PM (time is optional) -->
|
|
<Envelope>
|
|
<Body>
|
|
<ExportList>
|
|
<LIST_ID>{list_id}</LIST_ID>
|
|
<EXPORT_TYPE>{export_type}</EXPORT_TYPE>
|
|
<EXPORT_FORMAT>{export_format}</EXPORT_FORMAT>
|
|
<INCLUDE_RECIPIENT_ID></INCLUDE_RECIPIENT_ID>
|
|
<EXPORT_COLUMNS>
|
|
{columns}
|
|
</EXPORT_COLUMNS>
|
|
<DATE_START>{date_start}</DATE_START>
|
|
<DATE_END>{date_end}</DATE_END>
|
|
<VISIBILITY>{visibility}</VISIBILITY>
|
|
<LIST_DATE_FORMAT>yyyy-MM-dd</LIST_DATE_FORMAT>
|
|
</ExportList>
|
|
</Body>
|
|
</Envelope>
|
|
""",
|
|
"request_params": {
|
|
"list_id": "{{ var.value.fivetran_acoustic_suppression_export_list_id }}", # list_name: "MAIN SUPPRESSION LIST - All"
|
|
"export_type": "ALL",
|
|
"export_format": "CSV",
|
|
"visibility": 1, # 0 (Private) or 1 (Shared)
|
|
"date_start": EXEC_START,
|
|
"date_end": EXEC_END,
|
|
"columns": "\n".join(
|
|
[f"<COLUMN>{column}</COLUMN>" for column in SUPPRESSION_COLUMNS]
|
|
),
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
DEFAULT_ARGS = {
|
|
"owner": DAG_OWNER,
|
|
"email": [DAG_OWNER],
|
|
"depends_on_past": True,
|
|
"start_date": datetime(2024, 3, 13),
|
|
"email_on_failure": True,
|
|
"email_on_retry": False,
|
|
"retries": 1, # at this point we can probably be confident user intervention is required
|
|
"retry_delay": timedelta(minutes=30),
|
|
}
|
|
|
|
TAGS = [Tag.ImpactTier.tier_1]
|
|
|
|
for report_type, _config in REPORTS_CONFIG.items():
|
|
dag_id = f"fivetran_acoustic_{report_type}"
|
|
|
|
with DAG(
|
|
dag_id=dag_id,
|
|
default_args=DEFAULT_ARGS,
|
|
doc_md=DOCS,
|
|
schedule_interval="0 8 * * *", # Fivetran seems to operate in PST timezone (UTC -8)
|
|
tags=TAGS,
|
|
catchup=True,
|
|
max_active_runs=1, # to make sure that we don't attempt to trigger multiple Fivetran when one already running.
|
|
) as dag:
|
|
generate_report = PythonOperator(
|
|
task_id="generate_acoustic_report",
|
|
python_callable=_generate_acoustic_report,
|
|
op_args=[ACOUSTIC_CONNECTION_ID, report_type, _config],
|
|
execution_timeout=timedelta(hours=2),
|
|
)
|
|
|
|
sync_trigger = FivetranOperator(
|
|
task_id="trigger_fivetran_connector",
|
|
connector_id=f"{{{{ var.value.fivetran_acoustic_{report_type}_connector_id }}}}",
|
|
deferrable=False,
|
|
task_concurrency=1,
|
|
)
|
|
|
|
load_completed = EmptyOperator(
|
|
task_id="fivetran_load_completed",
|
|
)
|
|
|
|
generate_report >> sync_trigger >> load_completed
|
|
globals()[dag_id] = dag
|