fix: Increase bhr_collection spark memory (#1995)

This commit is contained in:
Ben Wu 2024-05-24 15:54:32 -04:00 коммит произвёл GitHub
Родитель f17fa691c9
Коммит b9a2fc850c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
1 изменённых файлов: 30 добавлений и 44 удалений

Просмотреть файл

@ -11,7 +11,7 @@ is maintained in the mozetl repository.
* Migrated from Databricks and now running as a scheduled Dataproc task. *
The resulting aggregatations are used by the following service:
The resulting aggregations are used by the following service:
https://fqueze.github.io/hang-stats/#date=[DATE]&row=0
"""
@ -27,13 +27,14 @@ from utils.dataproc import get_dataproc_parameters, moz_dataproc_pyspark_runner
from utils.tags import Tag
default_args = {
"owner": "kik@mozilla.com",
"owner": "bewu@mozilla.com",
"depends_on_past": False,
"start_date": datetime.datetime(2020, 11, 26),
"email": [
"telemetry-alerts@mozilla.com",
"kik@mozilla.com",
"dothayer@mozilla.com",
"bewu@mozilla.com",
],
"email_on_failure": True,
"email_on_retry": True,
@ -72,28 +73,39 @@ with DAG(
params = get_dataproc_parameters("google_cloud_airflow_dataproc")
shared_runner_args = {
"parent_dag_name": dag.dag_id,
"image_version": "1.5-debian10",
"default_args": default_args,
"python_driver_code": "https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/bhr_collection/bhr_collection.py",
"init_actions_uris": [
"gs://dataproc-initialization-actions/python/pip-install.sh"
],
"additional_metadata": {
"PIP_PACKAGES": "boto3==1.16.20 click==7.1.2 google-cloud-storage==2.7.0"
},
"additional_properties": {
"spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar",
"spark:spark.driver.memory": "18g",
"spark:spark.executor.memory": "24g",
},
"idle_delete_ttl": 14400,
"num_workers": 6,
"master_machine_type": "n2-standard-8",
"worker_machine_type": "n2-highmem-4",
"gcp_conn_id": params.conn_id,
"service_account": params.client_email,
"storage_bucket": params.storage_bucket,
}
bhr_collection = SubDagOperator(
task_id="bhr_collection",
dag=dag,
subdag=moz_dataproc_pyspark_runner(
parent_dag_name=dag.dag_id,
image_version="1.5-debian10",
dag_name="bhr_collection",
default_args=default_args,
cluster_name="bhr-collection-main-{{ ds }}",
job_name="bhr-collection-main",
python_driver_code="https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/bhr_collection/bhr_collection.py",
init_actions_uris=[
"gs://dataproc-initialization-actions/python/pip-install.sh"
],
additional_metadata={
"PIP_PACKAGES": "boto3==1.16.20 click==7.1.2 google-cloud-storage==2.7.0"
},
additional_properties={
"spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar",
"spark:spark.driver.memory": "12g",
"spark:spark.executor.memory": "15g",
},
**shared_runner_args,
py_args=[
"--date",
"{{ ds }}",
@ -105,12 +117,6 @@ with DAG(
"--output-tag",
"main",
],
idle_delete_ttl=14400,
num_workers=6,
worker_machine_type="n1-highmem-4",
gcp_conn_id=params.conn_id,
service_account=params.client_email,
storage_bucket=params.storage_bucket,
),
)
@ -118,24 +124,10 @@ with DAG(
task_id="bhr_collection_child",
dag=dag,
subdag=moz_dataproc_pyspark_runner(
parent_dag_name=dag.dag_id,
image_version="1.5-debian10",
dag_name="bhr_collection_child",
default_args=default_args,
cluster_name="bhr-collection-child-{{ ds }}",
job_name="bhr-collection-child",
python_driver_code="https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/bhr_collection/bhr_collection.py",
init_actions_uris=[
"gs://dataproc-initialization-actions/python/pip-install.sh"
],
additional_metadata={
"PIP_PACKAGES": "boto3==1.16.20 click==7.1.2 google-cloud-storage==2.7.0"
},
additional_properties={
"spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar",
"spark:spark.driver.memory": "12g",
"spark:spark.executor.memory": "15g",
},
**shared_runner_args,
py_args=[
"--date",
"{{ ds }}",
@ -147,12 +139,6 @@ with DAG(
"--output-tag",
"child",
],
idle_delete_ttl=14400,
num_workers=6,
worker_machine_type="n1-highmem-4",
gcp_conn_id=params.conn_id,
service_account=params.client_email,
storage_bucket=params.storage_bucket,
),
)