telemetry-airflow/utils/dataproc.py

822 строки
32 KiB
Python

import os
from collections import namedtuple
from airflow import models
from airflow.exceptions import AirflowException
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
# When google deprecates dataproc_v1beta2 in DataprocHook/Operator classes
# We can import these from our patched code, rather than upgrading/deploying
# apache-airflow-providers-google > 6.0.0, and google-cloud-dataproc > 2.5.0
# from utils.patched.dataproc_operator import (
from airflow.providers.google.cloud.operators.dataproc import (
ClusterGenerator,
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
DataprocSubmitPySparkJobOperator,
DataprocSubmitSparkJobOperator,
)
class DataProcHelper:
"""Helper class for creating/deleting dataproc clusters."""
def __init__(
self,
cluster_name=None,
job_name=None,
num_workers=2,
image_version="1.4-debian10",
region="us-west1",
subnetwork_uri=None,
internal_ip_only=None,
idle_delete_ttl=14400,
auto_delete_ttl=28800,
master_machine_type="n1-standard-8",
worker_machine_type="n1-standard-4",
num_preemptible_workers=0,
service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com",
init_actions_uris=None,
additional_metadata=None,
additional_properties=None,
optional_components=None,
install_component_gateway=True,
aws_conn_id=None,
gcp_conn_id="google_cloud_airflow_dataproc",
project_id="airflow-dataproc",
artifact_bucket="moz-fx-data-prod-airflow-dataproc-artifacts",
storage_bucket="moz-fx-data-prod-dataproc-scratch",
master_disk_type="pd-standard",
master_disk_size=1024,
master_num_local_ssds=0,
worker_disk_type="pd-standard",
worker_disk_size=1024,
worker_num_local_ssds=0,
):
if optional_components is None:
optional_components = ["ANACONDA"]
self.cluster_name = cluster_name
self.job_name = job_name
self.num_workers = num_workers
self.image_version = image_version
self.region = region
self.subnetwork_uri = subnetwork_uri
self.internal_ip_only = internal_ip_only
self.idle_delete_ttl = idle_delete_ttl
self.auto_delete_ttl = auto_delete_ttl
self.master_machine_type = master_machine_type
self.worker_machine_type = worker_machine_type
self.num_preemptible_workers = num_preemptible_workers
self.service_account = service_account
# The bucket with a default dataproc init script
self.artifact_bucket = artifact_bucket
self.storage_bucket = storage_bucket
self.master_disk_type = master_disk_type
self.master_disk_size = master_disk_size
self.master_num_local_ssds = master_num_local_ssds
self.worker_disk_type = worker_disk_type
self.worker_disk_size = worker_disk_size
self.worker_num_local_ssds = worker_num_local_ssds
if init_actions_uris is None:
self.init_actions_uris = [
f"gs://{self.artifact_bucket}/bootstrap/dataproc_init.sh"
]
else:
self.init_actions_uris = init_actions_uris
if additional_metadata is None:
self.additional_metadata = {}
else:
self.additional_metadata = additional_metadata
if additional_properties is None:
self.additional_properties = {}
else:
self.additional_properties = additional_properties
self.optional_components = optional_components
self.install_component_gateway = install_component_gateway
self.aws_conn_id = aws_conn_id
self.gcp_conn_id = gcp_conn_id
self.project_id = project_id
def create_cluster(self):
"""Return a DataprocCreateClusterOperator."""
properties = {}
# Google cloud storage requires object.create permission when reading from pyspark
properties["core:fs.gs.implicit.dir.repair.enable"] = "false"
# Set hadoop properties to access s3 from dataproc
if self.aws_conn_id:
for key, value in zip(
("access.key", "secret.key", "session.token"),
AwsBaseHook(
aws_conn_id=self.aws_conn_id, client_type="s3"
).get_credentials(),
):
if value is not None:
properties["core:fs.s3a." + key] = value
# For older spark versions we need to set the properties differently
if key == "access.key":
properties["core:fs.s3.awsAccessKeyId"] = value
elif key == "secret.key":
properties["core:fs.s3.awsSecretAccessKey"] = value
properties.update(self.additional_properties)
metadata = {
"gcs-connector-version": "1.9.16",
"bigquery-connector-version": "0.13.6",
}
metadata.update(self.additional_metadata)
cluster_generator = ClusterGenerator(
project_id=self.project_id,
num_workers=self.num_workers,
subnetwork_uri=self.subnetwork_uri,
internal_ip_only=self.internal_ip_only,
storage_bucket=self.storage_bucket,
init_actions_uris=self.init_actions_uris,
metadata=metadata,
image_version=self.image_version,
properties=properties,
optional_components=self.optional_components,
master_machine_type=self.master_machine_type,
master_disk_type=self.master_disk_type,
master_disk_size=self.master_disk_size,
worker_machine_type=self.worker_machine_type,
worker_disk_type=self.worker_disk_type,
worker_disk_size=self.worker_disk_size,
num_preemptible_workers=self.num_preemptible_workers,
service_account=self.service_account,
idle_delete_ttl=self.idle_delete_ttl,
auto_delete_ttl=self.auto_delete_ttl,
)
cluster_config = cluster_generator.make()
# The DataprocCreateClusterOperator and ClusterGenerator dont support component gateway or local ssds
# ClusterConfig format is
# https://cloud.google.com/dataproc/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.ClusterConfig
if self.install_component_gateway:
cluster_config.update(
{"endpoint_config": {"enable_http_port_access": True}}
)
if self.master_num_local_ssds > 0:
master_instance_group_config = cluster_config["master_config"]
master_instance_group_config["disk_config"][
"num_local_ssds"
] = self.master_num_local_ssds
cluster_config.update({"master_config": master_instance_group_config})
if self.worker_num_local_ssds > 0:
worker_instance_group_config = cluster_config["worker_config"]
worker_instance_group_config["disk_config"][
"num_local_ssds"
] = self.worker_num_local_ssds
cluster_config.update({"worker_config": worker_instance_group_config})
return DataprocCreateClusterOperator(
task_id="create_dataproc_cluster",
cluster_name=self.cluster_name,
project_id=self.project_id,
use_if_exists=True,
delete_on_error=True,
labels={
"env": os.getenv("DEPLOY_ENVIRONMENT", "env_not_set"),
"owner": os.getenv("AIRFLOW_CTX_DAG_OWNER", "owner_not_set"),
"jobname": self.job_name.lower().replace("_", "-"),
},
gcp_conn_id=self.gcp_conn_id,
region=self.region,
cluster_config=cluster_config,
)
def delete_cluster(self):
"""Return a DataprocDeleteClusterOperator."""
return DataprocDeleteClusterOperator(
task_id="delete_dataproc_cluster",
cluster_name=self.cluster_name,
region=self.region,
gcp_conn_id=self.gcp_conn_id,
project_id=self.project_id,
)
# End DataProcHelper
def moz_dataproc_pyspark_runner(
parent_dag_name=None,
dag_name="run_pyspark_on_dataproc",
default_args=None,
cluster_name=None,
num_workers=2,
image_version="1.4-debian10",
region="us-west1",
subnetwork_uri=None,
internal_ip_only=None,
idle_delete_ttl=10800,
auto_delete_ttl=21600,
master_machine_type="n1-standard-8",
worker_machine_type="n1-standard-4",
num_preemptible_workers=0,
service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com",
init_actions_uris=None,
additional_metadata=None,
additional_properties=None,
optional_components=None,
install_component_gateway=True,
python_driver_code=None,
py_args=None,
job_name=None,
aws_conn_id=None,
gcp_conn_id="google_cloud_airflow_dataproc",
project_id="airflow-dataproc",
artifact_bucket="moz-fx-data-prod-airflow-dataproc-artifacts",
storage_bucket="moz-fx-data-prod-dataproc-scratch",
master_disk_type="pd-standard",
worker_disk_type="pd-standard",
master_disk_size=1024,
worker_disk_size=1024,
master_num_local_ssds=0,
worker_num_local_ssds=0,
):
"""
Create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway.
Then we call DataprocSubmitPySparkJobOperator to execute the pyspark script defined by the argument
python_driver_code. Once that succeeds, we teardown the cluster.
**Example**: ::
# Unsalted cluster name so subsequent runs fail if the cluster name exists
cluster_name = 'test-dataproc-cluster-hwoo'
# Defined in Airflow's UI -> Admin -> Connections
gcp_conn_id = 'google_cloud_airflow_dataproc'
run_dataproc_pyspark = SubDagOperator(
task_id='run_dataproc_pyspark',
dag=dag,
subdag = moz_dataproc_pyspark_runner(
parent_dag_name=dag.dag_id,
dag_name='run_dataproc_pyspark',
job_name='Do_something_on_pyspark',
default_args=default_args,
cluster_name=cluster_name,
python_driver_code='gs://some_bucket/some_py_script.py',
py_args=["-d", "{{ ds_nodash }}"],
gcp_conn_id=gcp_conn_id)
)
Airflow related args:
---
:param str parent_dag_name: Parent dag name.
:param str dag_name: Dag name.
:param dict default_args: Dag configuration.
Dataproc Cluster related args:
---
:param str cluster_name: The name of the dataproc cluster.
:param int num_workers: The number of spark workers.
:param str image_version: The image version of software to use for dataproc
cluster.
:param str region: Region where the dataproc cluster will be located.
Zone will be chosen automatically
:param str subnetwork_uri: The subnetwork uri to be used for machine communication,
cannot be specified with network_uri. Only need this if
setting internal_ip_only = True. (See next parameter)
:param bool internal_ip_only: If True, cluster nodes will only have internal IP addresses.
Can only be enabled with subnetwork_uri enabled networks.
We use this for NAT'd dataproc clusters whose outbound traffic
needs to be whitelisted. To use a NAT'd cluster, set
subnetwork_uri='default', internal_ip_only=True, and
region=us-west2-a|b|c
:param int idle_delete_ttl: The duration in seconds to keep idle cluster alive.
:param int auto_delete_ttl: The duration in seconds that the cluster will live.
:param str master_machine_type: Compute engine machine type to use for master.
:param str worker_machine_type: Compute engine machine type to use for the workers.
:param int num_preemptible_workers: Number of preemptible worker nodes to spin up.
:param str service_account: The service account for spark VMs to use. For example
if cross project access is needed. Note that this svc
account needs the following permissions:
roles/logging.logWriter and roles/storage.objectAdmin.
:param list init_actions_uris: List of GCS uri's containing dataproc init scripts.
:param dict additional_metadata Custom metadata keys and values, might be used to
configure initialization actions.
:param dict additional_properties Custom cluster properties, can be used to configure
cluster components, add Spark packages, etc.
:param str job_name: Name of the spark job to run.
:param str aws_conn_id: Airflow connection id for S3 access (if needed).
:param str gcp_conn_id: The connection ID to use connecting to GCP.
:param str project_id: The project ID corresponding to the gcp_conn_id. We
add this because the dev environment doesn't parse it out
correctly from the dummy connections.
:param str artifact_bucket: Path to resources for bootstrapping the dataproc cluster
:param str storage_bucket: Path to scratch bucket for intermediate cluster results
:param list optional_components: List of optional components to install on cluster
Defaults to ['ANACONDA'] for now since JUPYTER is broken.
:param str install_component_gateway: Enable alpha feature component gateway.
:param master_disk_type: Type of the boot disk for the master node
(default is ``pd-standard``).
Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
``pd-standard`` (Persistent Disk Hard Disk Drive).
:type master_disk_type: str
:param master_disk_size: Disk size for the master node
:type master_disk_size: int
:param master_num_local_ssds : Number of local SSDs to mount
(default is 0)
:type master_num_local_ssds : int
:param worker_disk_type: Type of the boot disk for the worker node
(default is ``pd-standard``).
Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
``pd-standard`` (Persistent Disk Hard Disk Drive).
:type worker_disk_type: str
:param worker_disk_size: Disk size for the worker node
:type worker_disk_size: int
:param worker_num_local_ssds : Number of local SSDs to mount
(default is 0)
:type worker_num_local_ssds : int
Pyspark related args:
---
:param str python_driver_code: The Hadoop Compatible Filesystem (HCFS) URI of the main
Python file to use as the driver. Must be a .py file.
:param list py_args: Arguments for the pyspark job.
"""
if optional_components is None:
optional_components = ["ANACONDA"]
if cluster_name is None or python_driver_code is None:
raise AirflowException("Please specify cluster_name and/or python_driver_code.")
dataproc_helper = DataProcHelper(
cluster_name=cluster_name,
job_name=job_name,
num_workers=num_workers,
image_version=image_version,
region=region,
subnetwork_uri=subnetwork_uri,
internal_ip_only=internal_ip_only,
idle_delete_ttl=idle_delete_ttl,
auto_delete_ttl=auto_delete_ttl,
master_machine_type=master_machine_type,
worker_machine_type=worker_machine_type,
num_preemptible_workers=num_preemptible_workers,
service_account=service_account,
init_actions_uris=init_actions_uris,
optional_components=optional_components,
additional_metadata=additional_metadata,
additional_properties=additional_properties,
install_component_gateway=install_component_gateway,
aws_conn_id=aws_conn_id,
gcp_conn_id=gcp_conn_id,
project_id=project_id,
artifact_bucket=artifact_bucket,
storage_bucket=storage_bucket,
master_disk_type=master_disk_type,
master_disk_size=master_disk_size,
worker_disk_type=worker_disk_type,
worker_disk_size=worker_disk_size,
master_num_local_ssds=master_num_local_ssds,
worker_num_local_ssds=worker_num_local_ssds,
)
_dag_name = f"{parent_dag_name}.{dag_name}"
with models.DAG(_dag_name, default_args=default_args) as dag:
create_dataproc_cluster = dataproc_helper.create_cluster()
run_pyspark_on_dataproc = DataprocSubmitPySparkJobOperator(
task_id="run_dataproc_pyspark",
job_name=job_name,
cluster_name=cluster_name,
region=region,
main=python_driver_code,
arguments=py_args,
gcp_conn_id=gcp_conn_id,
project_id=project_id,
)
delete_dataproc_cluster = dataproc_helper.delete_cluster()
create_dataproc_cluster >> run_pyspark_on_dataproc >> delete_dataproc_cluster
return dag
# End moz_dataproc_pyspark_runner
def moz_dataproc_jar_runner(
parent_dag_name=None,
dag_name="run_script_on_dataproc",
default_args=None,
cluster_name=None,
num_workers=2,
image_version="1.4-debian10",
region="us-west1",
subnetwork_uri=None,
internal_ip_only=None,
idle_delete_ttl=14400,
auto_delete_ttl=28800,
master_machine_type="n1-standard-8",
worker_machine_type="n1-standard-4",
num_preemptible_workers=0,
service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com",
init_actions_uris=None,
optional_components=None,
install_component_gateway=True,
jar_urls=None,
main_class=None,
jar_args=None,
job_name=None,
aws_conn_id=None,
gcp_conn_id="google_cloud_airflow_dataproc",
project_id="airflow-dataproc",
master_disk_type="pd-standard",
worker_disk_type="pd-standard",
master_disk_size=1024,
worker_disk_size=1024,
master_num_local_ssds=0,
worker_num_local_ssds=0,
):
"""
Create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway.
Then we call DataprocSubmitSparkJobOperator to execute the jar defined by the arguments
jar_urls and main_class. Once that succeeds, we teardown the cluster.
**Example**: ::
# Unsalted cluster name so subsequent runs fail if the cluster name exists
cluster_name = 'test-dataproc-cluster-hwoo'
# Defined in Airflow's UI -> Admin -> Connections
gcp_conn_id = 'google_cloud_airflow_dataproc'
run_dataproc_jar = SubDagOperator(
task_id='run_dataproc_jar',
dag=dag,
subdag = moz_dataproc_jar_runner(
parent_dag_name=dag.dag_id,
dag_name='run_dataproc_jar',
job_name='Run_some_spark_jar_on_dataproc',
default_args=default_args,
cluster_name=cluster_name,
jar_urls=['gs://some_bucket/some_jar.jar'],
main_class='com.mozilla.path.to.ClassName',
jar_args=["-d", "{{ ds_nodash }}"],
gcp_conn_id=gcp_conn_id)
)
Airflow related args:
---
See moz_dataproc_pyspark_runner
Dataproc Cluster related args:
---
See moz_dataproc_pyspark_runner
Jar runner related args:
---
:param list jar_urls: URIs to jars provisioned in Cloud Storage (example:
for UDFs and libs) and are ideal to put in default arguments.
:param str main_class: Name of the job class entrypoint to execute.
:param list jar_args: Arguments for the job.
"""
if optional_components is None:
optional_components = ["ANACONDA"]
if cluster_name is None or jar_urls is None or main_class is None:
raise AirflowException(
"Please specify cluster_name, jar_urls, and/or main_class."
)
dataproc_helper = DataProcHelper(
cluster_name=cluster_name,
job_name=job_name,
num_workers=num_workers,
image_version=image_version,
region=region,
subnetwork_uri=subnetwork_uri,
internal_ip_only=internal_ip_only,
idle_delete_ttl=idle_delete_ttl,
auto_delete_ttl=auto_delete_ttl,
master_machine_type=master_machine_type,
worker_machine_type=worker_machine_type,
num_preemptible_workers=num_preemptible_workers,
service_account=service_account,
init_actions_uris=init_actions_uris,
optional_components=optional_components,
install_component_gateway=install_component_gateway,
aws_conn_id=aws_conn_id,
gcp_conn_id=gcp_conn_id,
project_id=project_id,
master_disk_type=master_disk_type,
master_disk_size=master_disk_size,
worker_disk_type=worker_disk_type,
worker_disk_size=worker_disk_size,
master_num_local_ssds=master_num_local_ssds,
worker_num_local_ssds=worker_num_local_ssds,
)
_dag_name = f"{parent_dag_name}.{dag_name}"
with models.DAG(_dag_name, default_args=default_args) as dag:
create_dataproc_cluster = dataproc_helper.create_cluster()
run_jar_on_dataproc = DataprocSubmitSparkJobOperator(
cluster_name=cluster_name,
region=region,
task_id="run_jar_on_dataproc",
job_name=job_name,
dataproc_jars=jar_urls,
main_class=main_class,
arguments=jar_args,
gcp_conn_id=gcp_conn_id,
project_id=project_id,
)
delete_dataproc_cluster = dataproc_helper.delete_cluster()
create_dataproc_cluster >> run_jar_on_dataproc >> delete_dataproc_cluster
return dag
# End moz_dataproc_jar_runner
def _format_envvar(env=None):
# Use a default value if an environment dictionary isn't supplied
return " ".join([f"{k}={v}" for k, v in (env or {}).items()])
def moz_dataproc_scriptrunner(
parent_dag_name=None,
dag_name="run_script_on_dataproc",
default_args=None,
cluster_name=None,
num_workers=2,
image_version="1.4-debian10",
region="us-west1",
subnetwork_uri=None,
internal_ip_only=None,
idle_delete_ttl=14400,
auto_delete_ttl=28800,
master_machine_type="n1-standard-8",
worker_machine_type="n1-standard-4",
num_preemptible_workers=0,
service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com",
init_actions_uris=None,
optional_components=None,
install_component_gateway=True,
uri=None,
env=None,
arguments=None,
job_name=None,
aws_conn_id=None,
gcp_conn_id="google_cloud_airflow_dataproc",
project_id="airflow-dataproc",
master_disk_type="pd-standard",
worker_disk_type="pd-standard",
master_disk_size=1024,
worker_disk_size=1024,
master_num_local_ssds=0,
worker_num_local_ssds=0,
):
"""
Create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway.
Then we execute a script uri (either https or gcs) similar to how we use our custom AWS
EmrSparkOperator. This will call DataprocSubmitSparkJobOperator using EMR's script-runner.jar, which
then executes the airflow_gcp.sh entrypoint script. The entrypoint script expects another
script uri, along with it's arguments, as parameters. Once that succeeds, we teardown the
cluster.
**Example**: ::
# Unsalted cluster name so subsequent runs fail if the cluster name exists
cluster_name = 'test-dataproc-cluster-hwoo'
# Defined in Airflow's UI -> Admin -> Connections
gcp_conn_id = 'google_cloud_airflow_dataproc'
run_dataproc_script = SubDagOperator(
task_id='run_dataproc_script',
dag=dag,
subdag = moz_dataproc_scriptrunner(
parent_dag_name=dag.dag_id,
dag_name='run_dataproc_script',
default_args=default_args,
cluster_name=cluster_name,
job_name='Run_a_script_on_dataproc',
uri='https://raw.githubusercontent.com/mozilla/telemetry-airflow/main/jobs/some_bash_or_py_script.py',
env={"date": "{{ ds_nodash }}"},
arguments="-d {{ ds_nodash }}",
gcp_conn_id=gcp_conn_id)
)
Airflow related args:
---
See moz_dataproc_pyspark_runner
Dataproc Cluster related args:
---
See moz_dataproc_pyspark_runner
Scriptrunner specific args:
---
:param str uri: The HTTP or GCS URI of the script to run. Can be
.py, .jar, or other type of script (e.g. bash). Is ran
via the airflow_gcp.sh entrypoint. Ipynb is no longer
supported.
:param dict env: If env is not None, it must be a mapping that defines
the environment variables for the new process
(templated).
:param str arguments: Passed to `airflow_gcp.sh`, passed as one long string
of space separated args.
"""
if optional_components is None:
optional_components = ["ANACONDA"]
if job_name is None or uri is None or cluster_name is None:
raise AirflowException("Please specify job_name, uri, and cluster_name.")
dataproc_helper = DataProcHelper(
cluster_name=cluster_name,
job_name=job_name,
num_workers=num_workers,
image_version=image_version,
region=region,
subnetwork_uri=subnetwork_uri,
internal_ip_only=internal_ip_only,
idle_delete_ttl=idle_delete_ttl,
auto_delete_ttl=auto_delete_ttl,
master_machine_type=master_machine_type,
worker_machine_type=worker_machine_type,
num_preemptible_workers=num_preemptible_workers,
service_account=service_account,
init_actions_uris=init_actions_uris,
optional_components=optional_components,
install_component_gateway=install_component_gateway,
aws_conn_id=aws_conn_id,
gcp_conn_id=gcp_conn_id,
project_id=project_id,
master_disk_type=master_disk_type,
master_disk_size=master_disk_size,
worker_disk_type=worker_disk_type,
worker_disk_size=worker_disk_size,
master_num_local_ssds=master_num_local_ssds,
worker_num_local_ssds=worker_num_local_ssds,
)
_dag_name = f"{parent_dag_name}.{dag_name}"
environment = _format_envvar(env)
script_bucket = "moz-fx-data-prod-airflow-dataproc-artifacts"
jar_url = f"gs://{script_bucket}/bin/script-runner.jar"
args = [
f"gs://{script_bucket}/bootstrap/airflow_gcp.sh",
"--job-name",
job_name,
"--uri",
uri,
"--environment",
environment,
]
if arguments:
args += ["--arguments", arguments]
with models.DAG(_dag_name, default_args=default_args) as dag:
create_dataproc_cluster = dataproc_helper.create_cluster()
# Run DataprocSubmitSparkJobOperator with script-runner.jar pointing to airflow_gcp.sh.
run_script_on_dataproc = DataprocSubmitSparkJobOperator(
cluster_name=cluster_name,
region=region,
task_id="run_script_on_dataproc",
job_name=job_name,
dataproc_jars=[jar_url],
main_class="com.amazon.elasticmapreduce.scriptrunner.ScriptRunner",
arguments=args,
gcp_conn_id=gcp_conn_id,
project_id=project_id,
)
delete_dataproc_cluster = dataproc_helper.delete_cluster()
create_dataproc_cluster >> run_script_on_dataproc >> delete_dataproc_cluster
return dag
# End moz_dataproc_scriptrunner
def copy_artifacts_dev(dag, project_id, artifact_bucket, storage_bucket):
"""
Bootstrap a dataproc job for local testing.
This job requires setting GOOGLE_APPLICATION_CREDENTIALS before starting the
airflow container. It will copy the contents of the local jobs and
dataproc_boostrap folders to the artifacts bucket, and create a scratch
storage bucket for dataproc.
:dag DAG: The dag to register the job
:project_id str: The project id, necessary for setting the default project
:artifact_bucket str: The bucket for storing bootstrap artifacts
:storage_bucket str: The scratch bucket for dataproc
"""
return BashOperator(
task_id="copy_to_dev_artifacts",
bash_command="""
gcloud auth activate-service-account --key-file ~/.credentials || cat ~/.credentials
gcloud config set project ${PROJECT_ID}
gsutil mb gs://${ARTIFACT_BUCKET}
gsutil mb gs://${STORAGE_BUCKET}
gsutil -m cp -r ~/dataproc_bootstrap gs://${ARTIFACT_BUCKET}
gsutil -m cp -r ~/jobs gs://${ARTIFACT_BUCKET}
echo "listing artifacts..."
gsutil ls -r gs://${ARTIFACT_BUCKET}
""",
env={
# https://github.com/GoogleCloudPlatform/gsutil/issues/236
"CLOUDSDK_PYTHON": "python",
"PROJECT_ID": project_id,
"ARTIFACT_BUCKET": artifact_bucket,
"STORAGE_BUCKET": storage_bucket,
},
dag=dag,
)
# parameters that can be used to reconfigure a dataproc job for dev testing
DataprocParameters = namedtuple(
"DataprocParameters",
[
"conn_id",
"project_id",
"is_dev",
"client_email",
"artifact_bucket",
"storage_bucket",
"output_bucket",
],
)
def get_dataproc_parameters(conn_id="google_cloud_airflow_dataproc"):
"""
Can be used to gather parameters that correspond to development parameters.
The provided connection string should be a Google Cloud connection
and should either be the production default ("dataproc-runner-prod"), or a
service key associated with a sandbox account.
"""
dev_project_id = "replace_me"
dev_client_email = "replace_me"
is_dev = os.environ.get("DEPLOY_ENVIRONMENT") == "dev"
project_id = "airflow-dataproc" if is_dev else dev_project_id
client_email = (
dev_client_email
if is_dev
else "dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com"
)
artifact_bucket = (
f"{project_id}-dataproc-artifacts"
if is_dev
else "moz-fx-data-prod-airflow-dataproc-artifacts"
)
storage_bucket = (
f"{project_id}-dataproc-scratch"
if is_dev
else "moz-fx-data-prod-dataproc-scratch"
)
output_bucket = artifact_bucket if is_dev else "airflow-dataproc-bq-parquet-exports"
return DataprocParameters(
conn_id,
project_id,
is_dev,
client_email,
artifact_bucket,
storage_bucket,
output_bucket,
)