147 строки
5.0 KiB
Python
147 строки
5.0 KiB
Python
"""
|
|
A job to power graphics dashboard.
|
|
|
|
Processes main ping data and exports to GCS to power a graphics dashboard at
|
|
https://firefoxgraphics.github.io/telemetry/.
|
|
|
|
This was originally a Databricks notebook that was migrated to a scheduled
|
|
Dataproc task. Source code lives in the
|
|
[FirefoxGraphics/telemetry](https://github.com/FirefoxGraphics/telemetry)
|
|
repository.
|
|
|
|
This is a overwrite kind of operation and as long as the most recent DAG run succeeded
|
|
the job should be considered healthy.
|
|
"""
|
|
|
|
import datetime
|
|
|
|
from airflow import DAG
|
|
from airflow.operators.subdag import SubDagOperator
|
|
from airflow.sensors.external_task import ExternalTaskSensor
|
|
|
|
from utils.constants import ALLOWED_STATES, FAILED_STATES
|
|
from utils.dataproc import get_dataproc_parameters, moz_dataproc_pyspark_runner
|
|
from utils.tags import Tag
|
|
|
|
default_args = {
|
|
"owner": "kik@mozilla.com",
|
|
"depends_on_past": False,
|
|
"start_date": datetime.datetime(2020, 11, 26),
|
|
"email": [
|
|
"telemetry-alerts@mozilla.com",
|
|
"kik@mozilla.com",
|
|
],
|
|
"email_on_failure": True,
|
|
"email_on_retry": True,
|
|
"retries": 2,
|
|
"retry_delay": datetime.timedelta(minutes=20),
|
|
}
|
|
|
|
PIP_PACKAGES = [
|
|
"git+https://github.com/mozilla/python_moztelemetry.git@v0.10.7#egg=python-moztelemetry",
|
|
"git+https://github.com/FirefoxGraphics/telemetry.git#egg=pkg&subdirectory=analyses/bigquery_shim",
|
|
"boto3==1.16.20",
|
|
"six==1.15.0",
|
|
]
|
|
|
|
GCS_BUCKET = "moz-fx-data-static-websit-8565-analysis-output"
|
|
GCS_PREFIX = "gfx/telemetry-data/"
|
|
|
|
tags = [Tag.ImpactTier.tier_1]
|
|
|
|
with DAG(
|
|
"graphics_telemetry",
|
|
default_args=default_args,
|
|
schedule_interval="0 3 * * *",
|
|
doc_md=__doc__,
|
|
tags=tags,
|
|
) as dag:
|
|
wait_for_main_ping = ExternalTaskSensor(
|
|
task_id="wait_for_copy_deduplicate_main_ping",
|
|
external_dag_id="copy_deduplicate",
|
|
external_task_id="copy_deduplicate_main_ping",
|
|
execution_delta=datetime.timedelta(hours=2),
|
|
check_existence=True,
|
|
mode="reschedule",
|
|
allowed_states=ALLOWED_STATES,
|
|
failed_states=FAILED_STATES,
|
|
pool="DATA_ENG_EXTERNALTASKSENSOR",
|
|
email_on_retry=False,
|
|
dag=dag,
|
|
)
|
|
|
|
params = get_dataproc_parameters("google_cloud_airflow_dataproc")
|
|
|
|
graphics_trends = SubDagOperator(
|
|
task_id="graphics_trends",
|
|
dag=dag,
|
|
subdag=moz_dataproc_pyspark_runner(
|
|
parent_dag_name=dag.dag_id,
|
|
image_version="1.5-debian10",
|
|
dag_name="graphics_trends",
|
|
default_args=default_args,
|
|
cluster_name="graphics-trends-{{ ds }}",
|
|
job_name="graphics-trends",
|
|
python_driver_code="https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/graphics/graphics_telemetry_trends.py",
|
|
init_actions_uris=[
|
|
"gs://dataproc-initialization-actions/python/pip-install.sh"
|
|
],
|
|
additional_metadata={"PIP_PACKAGES": " ".join(PIP_PACKAGES)},
|
|
additional_properties={
|
|
"spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar",
|
|
},
|
|
py_args=[
|
|
"--gcs-bucket",
|
|
GCS_BUCKET,
|
|
"--gcs-prefix",
|
|
GCS_PREFIX,
|
|
"--weekly-fraction",
|
|
"0.003",
|
|
],
|
|
idle_delete_ttl=14400,
|
|
num_workers=2,
|
|
worker_machine_type="n1-standard-4",
|
|
gcp_conn_id=params.conn_id,
|
|
service_account=params.client_email,
|
|
storage_bucket=params.storage_bucket,
|
|
),
|
|
)
|
|
|
|
graphics_dashboard = SubDagOperator(
|
|
task_id="graphics_dashboard",
|
|
dag=dag,
|
|
subdag=moz_dataproc_pyspark_runner(
|
|
parent_dag_name=dag.dag_id,
|
|
image_version="1.5-debian10",
|
|
dag_name="graphics_dashboard",
|
|
default_args=default_args,
|
|
cluster_name="graphics-dashboard-{{ ds }}",
|
|
job_name="graphics-dashboard",
|
|
python_driver_code="https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/graphics/graphics_telemetry_dashboard.py",
|
|
init_actions_uris=[
|
|
"gs://dataproc-initialization-actions/python/pip-install.sh"
|
|
],
|
|
additional_metadata={"PIP_PACKAGES": " ".join(PIP_PACKAGES)},
|
|
additional_properties={
|
|
"spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar",
|
|
},
|
|
py_args=[
|
|
"--output-bucket",
|
|
GCS_BUCKET,
|
|
"--output-prefix",
|
|
GCS_PREFIX,
|
|
"--release-fraction",
|
|
"0.003",
|
|
],
|
|
idle_delete_ttl=14400,
|
|
num_workers=2,
|
|
worker_machine_type="n1-highmem-4",
|
|
gcp_conn_id=params.conn_id,
|
|
service_account=params.client_email,
|
|
storage_bucket=params.storage_bucket,
|
|
),
|
|
)
|
|
|
|
wait_for_main_ping >> graphics_trends
|
|
wait_for_main_ping >> graphics_dashboard
|