diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py b/airflow/providers/google/cloud/example_dags/example_dataproc.py index b6e1070df7..55dbff5d07 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataproc.py +++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py @@ -42,6 +42,8 @@ SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R") SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN) # Cluster definition +# [START how_to_cloud_dataproc_create_cluster] + CLUSTER = { "project_id": PROJECT_ID, "cluster_name": CLUSTER_NAME, @@ -59,8 +61,10 @@ CLUSTER = { }, } +# [END how_to_cloud_dataproc_create_cluster] # Update options +# [START how_to_cloud_dataproc_updatemask_cluster_operator] CLUSTER_UPDATE = { "config": { "worker_config": {"num_instances": 3}, @@ -73,23 +77,28 @@ UPDATE_MASK = { "config.secondary_worker_config.num_instances", ] } +# [END how_to_cloud_dataproc_updatemask_cluster_operator] TIMEOUT = {"seconds": 1 * 24 * 60 * 60} - # Jobs definitions +# [START how_to_cloud_dataproc_pig_config] PIG_JOB = { "reference": {"project_id": PROJECT_ID}, "placement": {"cluster_name": CLUSTER_NAME}, "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}}, } +# [END how_to_cloud_dataproc_pig_config] +# [START how_to_cloud_dataproc_sparksql_config] SPARK_SQL_JOB = { "reference": {"project_id": PROJECT_ID}, "placement": {"cluster_name": CLUSTER_NAME}, "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}}, } +# [END how_to_cloud_dataproc_sparksql_config] +# [START how_to_cloud_dataproc_spark_config] SPARK_JOB = { "reference": {"project_id": PROJECT_ID}, "placement": {"cluster_name": CLUSTER_NAME}, @@ -98,25 +107,33 @@ SPARK_JOB = { "main_class": "org.apache.spark.examples.SparkPi", }, } +# [END how_to_cloud_dataproc_spark_config] +# [START how_to_cloud_dataproc_pyspark_config] PYSPARK_JOB = { "reference": {"project_id": PROJECT_ID}, "placement": {"cluster_name": CLUSTER_NAME}, "pyspark_job": {"main_python_file_uri": PYSPARK_URI}, } +# [END how_to_cloud_dataproc_pyspark_config] +# [START how_to_cloud_dataproc_sparkr_config] SPARKR_JOB = { "reference": {"project_id": PROJECT_ID}, "placement": {"cluster_name": CLUSTER_NAME}, "spark_r_job": {"main_r_file_uri": SPARKR_URI}, } +# [END how_to_cloud_dataproc_sparkr_config] +# [START how_to_cloud_dataproc_hive_config] HIVE_JOB = { "reference": {"project_id": PROJECT_ID}, "placement": {"cluster_name": CLUSTER_NAME}, "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}}, } +# [END how_to_cloud_dataproc_hive_config] +# [START how_to_cloud_dataproc_hadoop_config] HADOOP_JOB = { "reference": {"project_id": PROJECT_ID}, "placement": {"cluster_name": CLUSTER_NAME}, @@ -125,16 +142,20 @@ HADOOP_JOB = { "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH], }, } +# [END how_to_cloud_dataproc_hadoop_config] with models.DAG( "example_gcp_dataproc", default_args={"start_date": days_ago(1)}, schedule_interval=None, ) as dag: + # [START how_to_cloud_dataproc_create_cluster_operator] create_cluster = DataprocCreateClusterOperator( task_id="create_cluster", project_id=PROJECT_ID, cluster=CLUSTER, region=REGION ) + # [END how_to_cloud_dataproc_create_cluster_operator] + # [START how_to_cloud_dataproc_update_cluster_operator] scale_cluster = DataprocUpdateClusterOperator( task_id="scale_cluster", cluster_name=CLUSTER_NAME, @@ -144,11 +165,11 @@ with models.DAG( project_id=PROJECT_ID, location=REGION, ) + # [END how_to_cloud_dataproc_update_cluster_operator] pig_task = DataprocSubmitJobOperator( task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID ) - spark_sql_task = DataprocSubmitJobOperator( task_id="spark_sql_task", job=SPARK_SQL_JOB, @@ -160,9 +181,11 @@ with models.DAG( task_id="spark_task", job=SPARK_JOB, location=REGION, project_id=PROJECT_ID ) + # [START how_to_cloud_dataproc_submit_job_to_cluster_operator] pyspark_task = DataprocSubmitJobOperator( task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID ) + # [END how_to_cloud_dataproc_submit_job_to_cluster_operator] sparkr_task = DataprocSubmitJobOperator( task_id="sparkr_task", job=SPARKR_JOB, location=REGION, project_id=PROJECT_ID @@ -176,12 +199,14 @@ with models.DAG( task_id="hadoop_task", job=HADOOP_JOB, location=REGION, project_id=PROJECT_ID ) + # [START how_to_cloud_dataproc_delete_cluster_operator] delete_cluster = DataprocDeleteClusterOperator( task_id="delete_cluster", project_id=PROJECT_ID, cluster_name=CLUSTER_NAME, region=REGION, ) + # [END how_to_cloud_dataproc_delete_cluster_operator] create_cluster >> scale_cluster scale_cluster >> hive_task >> delete_cluster diff --git a/docs/howto/operator/gcp/dataproc.rst b/docs/howto/operator/gcp/dataproc.rst new file mode 100644 index 0000000000..01e1f4aae5 --- /dev/null +++ b/docs/howto/operator/gcp/dataproc.rst @@ -0,0 +1,188 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Google Cloud Dataproc Operators +=============================== + +Dataproc is a managed Apache Spark and Apache Hadoop service that lets you +take advantage of open source data tools for batch processing, querying, streaming and machine learning. +Dataproc automation helps you create clusters quickly, manage them easily, and +save money by turning clusters off when you don't need them. + +For more information about the service visit `Dataproc production documentation `__ + +.. contents:: + :depth: 1 + :local: + +Prerequisite Tasks +------------------ + +.. include:: _partials/prerequisite_tasks.rst + + +.. _howto/operator:DataprocCreateClusterOperator: + +Create a Cluster +---------------- + +Before you create a dataproc cluster you need to define the cluster. +It describes the identifying information, config, and status of a cluster of Compute Engine instances. +For more information about the available fields to pass when creating a cluster, visit `Dataproc create cluster API. `__ + +A cluster configuration can look as followed: + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_create_cluster] + :end-before: [END how_to_cloud_dataproc_create_cluster] + +With this configuration we can create the cluster: +:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator` + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_dataproc_create_cluster_operator] + :end-before: [END how_to_cloud_dataproc_create_cluster_operator] + +Update a cluster +---------------- +You can scale the cluster up or down by providing a cluster config and a updateMask. +In the updateMask argument you specifies the path, relative to Cluster, of the field to update. +For more information on updateMask and other parameters take a look at `Dataproc update cluster API. `__ + +An example of a new cluster config and the updateMask: + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_updatemask_cluster_operator] + :end-before: [END how_to_cloud_dataproc_updatemask_cluster_operator] + +To update a cluster you can use: +:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocUpdateClusterOperator` + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_dataproc_update_cluster_operator] + :end-before: [END how_to_cloud_dataproc_update_cluster_operator] + +Deleting a cluster +------------------ + +To delete a cluster you can use: + +:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocDeleteClusterOperator`. + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_dataproc_delete_cluster_operator] + :end-before: [END how_to_cloud_dataproc_delete_cluster_operator] + +Submit a job to a cluster +------------------------- + +Dataproc supports submitting jobs of different big data components. +The list currently includes Spark, Hadoop, Pig and Hive. +For more information on versions and images take a look at `Cloud Dataproc Image version list `__ + +To submit a job to the cluster you need a provide a job source file. The job source file can be on GCS, the cluster or on your local +file system. You can specify a file:/// path to refer to a local file on a cluster's master node. + +The job configuration can be submitted by using: +:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`. + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_dataproc_submit_job_to_cluster_operator] + :end-before: [END how_to_cloud_dataproc_submit_job_to_cluster_operator] + +Examples of job configurations to submit +---------------------------------------- + +We have provided an example for every framework below. +There are more arguments to provide in the jobs than the examples show. For the complete list of arguments take a look at +`DataProc Job arguments `__ + +Example of the configuration for a PySpark Job: + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_pyspark_config] + :end-before: [END how_to_cloud_dataproc_pyspark_config] + +Example of the configuration for a SparkSQl Job: + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_sparksql_config] + :end-before: [END how_to_cloud_dataproc_sparksql_config] + +Example of the configuration for a Spark Job: + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_spark_config] + :end-before: [END how_to_cloud_dataproc_spark_config] + +Example of the configuration for a Hive Job: + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_hive_config] + :end-before: [END how_to_cloud_dataproc_hive_config] + +Example of the configuration for a Hadoop Job: + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_hadoop_config] + :end-before: [END how_to_cloud_dataproc_hadoop_config] + +Example of the configuration for a Pig Job: + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_pig_config] + :end-before: [END how_to_cloud_dataproc_pig_config] + + +Example of the configuration for a SparkR: + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py + :language: python + :dedent: 0 + :start-after: [START how_to_cloud_dataproc_sparkr_config] + :end-before: [END how_to_cloud_dataproc_sparkr_config] + +References +^^^^^^^^^^ +For further information, take a look at: + +* `DataProc API documentation `__ +* `Product documentation `__ diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index 3eaf1a0601..48aeea2241 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -697,7 +697,7 @@ These integrations allow you to perform various operations within the Google Clo - * - `Dataproc `__ - - + - :doc:`How to use ` - :mod:`airflow.providers.google.cloud.hooks.dataproc` - :mod:`airflow.providers.google.cloud.operators.dataproc` - diff --git a/tests/test_project_structure.py b/tests/test_project_structure.py index 8f834d5ba1..bef70b9af0 100644 --- a/tests/test_project_structure.py +++ b/tests/test_project_structure.py @@ -144,7 +144,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase): 'bigquery_to_mysql', 'cassandra_to_gcs', 'dataflow', - 'dataproc', 'datastore', 'dlp', 'gcs_to_bigquery',