telemetry-airflow/dags/dap_collector.py

76 строки
2.0 KiB
Python

from datetime import datetime, timedelta
from airflow import DAG
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
DOCS = """
### DAP Collector
#### Description
Runs a Docker image that collects data from a DAP (Distributed Aggregation Protocol) leader and stores it in BigQuery.
The container is defined in
[docker-etl](https://github.com/mozilla/docker-etl/tree/main/jobs/dap-collector)
For more information on Privacy Preserving Measurement in Firefox see
https://bugzilla.mozilla.org/show_bug.cgi?id=1775035
This DAG requires following variables to be defined in Airflow:
* dap_auth_token
* dap_hpke_private_key
* dap_task_config_url
This job is under active development, occasional failures are expected.
#### Owner
sfriedberger@mozilla.com
"""
default_args = {
"owner": "sfriedberger@mozilla.com",
"email": ["akomarzewski@mozilla.com", "sfriedberger@mozilla.com"],
"depends_on_past": False,
"start_date": datetime(2023, 3, 8),
"email_on_failure": True,
"email_on_retry": True,
"retries": 1,
"retry_delay": timedelta(hours=2),
}
project_id = "moz-fx-data-shared-prod"
table_id = "dap_collector_derived.aggregates_v1"
tags = [
Tag.ImpactTier.tier_3,
Tag.Triage.no_triage,
]
with DAG(
"dap_collector",
default_args=default_args,
doc_md=DOCS,
schedule_interval="@daily",
tags=tags,
) as dag:
dap_collector = GKEPodOperator(
task_id="dap_collector",
arguments=[
"python",
"dap_collector/main.py",
"--date={{ ds }}",
"--auth-token={{ var.value.dap_auth_token }}",
"--hpke-private-key={{ var.value.dap_hpke_private_key }}",
"--task-config-url={{ var.value.dap_task_config_url }}",
"--project",
project_id,
"--table-id",
table_id,
],
image="gcr.io/moz-fx-data-airflow-prod-88e0/dap-collector_docker_etl:latest",
gcp_conn_id="google_cloud_airflow_gke",
)