diff --git a/dags/bhr_collection.py b/dags/bhr_collection.py index f0fbf733..e915481d 100644 --- a/dags/bhr_collection.py +++ b/dags/bhr_collection.py @@ -11,7 +11,7 @@ is maintained in the mozetl repository. * Migrated from Databricks and now running as a scheduled Dataproc task. * -The resulting aggregatations are used by the following service: +The resulting aggregations are used by the following service: https://fqueze.github.io/hang-stats/#date=[DATE]&row=0 """ @@ -27,13 +27,14 @@ from utils.dataproc import get_dataproc_parameters, moz_dataproc_pyspark_runner from utils.tags import Tag default_args = { - "owner": "kik@mozilla.com", + "owner": "bewu@mozilla.com", "depends_on_past": False, "start_date": datetime.datetime(2020, 11, 26), "email": [ "telemetry-alerts@mozilla.com", "kik@mozilla.com", "dothayer@mozilla.com", + "bewu@mozilla.com", ], "email_on_failure": True, "email_on_retry": True, @@ -72,28 +73,39 @@ with DAG( params = get_dataproc_parameters("google_cloud_airflow_dataproc") + shared_runner_args = { + "parent_dag_name": dag.dag_id, + "image_version": "1.5-debian10", + "default_args": default_args, + "python_driver_code": "https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/bhr_collection/bhr_collection.py", + "init_actions_uris": [ + "gs://dataproc-initialization-actions/python/pip-install.sh" + ], + "additional_metadata": { + "PIP_PACKAGES": "boto3==1.16.20 click==7.1.2 google-cloud-storage==2.7.0" + }, + "additional_properties": { + "spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar", + "spark:spark.driver.memory": "18g", + "spark:spark.executor.memory": "24g", + }, + "idle_delete_ttl": 14400, + "num_workers": 6, + "master_machine_type": "n2-standard-8", + "worker_machine_type": "n2-highmem-4", + "gcp_conn_id": params.conn_id, + "service_account": params.client_email, + "storage_bucket": params.storage_bucket, + } + bhr_collection = SubDagOperator( task_id="bhr_collection", dag=dag, subdag=moz_dataproc_pyspark_runner( - parent_dag_name=dag.dag_id, - image_version="1.5-debian10", dag_name="bhr_collection", - default_args=default_args, cluster_name="bhr-collection-main-{{ ds }}", job_name="bhr-collection-main", - python_driver_code="https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/bhr_collection/bhr_collection.py", - init_actions_uris=[ - "gs://dataproc-initialization-actions/python/pip-install.sh" - ], - additional_metadata={ - "PIP_PACKAGES": "boto3==1.16.20 click==7.1.2 google-cloud-storage==2.7.0" - }, - additional_properties={ - "spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar", - "spark:spark.driver.memory": "12g", - "spark:spark.executor.memory": "15g", - }, + **shared_runner_args, py_args=[ "--date", "{{ ds }}", @@ -105,12 +117,6 @@ with DAG( "--output-tag", "main", ], - idle_delete_ttl=14400, - num_workers=6, - worker_machine_type="n1-highmem-4", - gcp_conn_id=params.conn_id, - service_account=params.client_email, - storage_bucket=params.storage_bucket, ), ) @@ -118,24 +124,10 @@ with DAG( task_id="bhr_collection_child", dag=dag, subdag=moz_dataproc_pyspark_runner( - parent_dag_name=dag.dag_id, - image_version="1.5-debian10", dag_name="bhr_collection_child", - default_args=default_args, cluster_name="bhr-collection-child-{{ ds }}", job_name="bhr-collection-child", - python_driver_code="https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/bhr_collection/bhr_collection.py", - init_actions_uris=[ - "gs://dataproc-initialization-actions/python/pip-install.sh" - ], - additional_metadata={ - "PIP_PACKAGES": "boto3==1.16.20 click==7.1.2 google-cloud-storage==2.7.0" - }, - additional_properties={ - "spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar", - "spark:spark.driver.memory": "12g", - "spark:spark.executor.memory": "15g", - }, + **shared_runner_args, py_args=[ "--date", "{{ ds }}", @@ -147,12 +139,6 @@ with DAG( "--output-tag", "child", ], - idle_delete_ttl=14400, - num_workers=6, - worker_machine_type="n1-highmem-4", - gcp_conn_id=params.conn_id, - service_account=params.client_email, - storage_bucket=params.storage_bucket, ), )