Create guide for Dataproc Operators (#9037)
* added documentation for dataproc * added more update information for updateMask * Added link to information about cluster config api request * Apply naming convention * Set all dedents from 4 to 0 * Adjust dedent to 4, for operators * removed dataproc guide from test_missing_guides
This commit is contained in:
Родитель
a3fc8be34e
Коммит
29eb68b90b
|
@ -42,6 +42,8 @@ SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R")
|
|||
SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN)
|
||||
|
||||
# Cluster definition
|
||||
# [START how_to_cloud_dataproc_create_cluster]
|
||||
|
||||
CLUSTER = {
|
||||
"project_id": PROJECT_ID,
|
||||
"cluster_name": CLUSTER_NAME,
|
||||
|
@ -59,8 +61,10 @@ CLUSTER = {
|
|||
},
|
||||
}
|
||||
|
||||
# [END how_to_cloud_dataproc_create_cluster]
|
||||
|
||||
# Update options
|
||||
# [START how_to_cloud_dataproc_updatemask_cluster_operator]
|
||||
CLUSTER_UPDATE = {
|
||||
"config": {
|
||||
"worker_config": {"num_instances": 3},
|
||||
|
@ -73,23 +77,28 @@ UPDATE_MASK = {
|
|||
"config.secondary_worker_config.num_instances",
|
||||
]
|
||||
}
|
||||
# [END how_to_cloud_dataproc_updatemask_cluster_operator]
|
||||
|
||||
TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
|
||||
|
||||
|
||||
# Jobs definitions
|
||||
# [START how_to_cloud_dataproc_pig_config]
|
||||
PIG_JOB = {
|
||||
"reference": {"project_id": PROJECT_ID},
|
||||
"placement": {"cluster_name": CLUSTER_NAME},
|
||||
"pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
|
||||
}
|
||||
# [END how_to_cloud_dataproc_pig_config]
|
||||
|
||||
# [START how_to_cloud_dataproc_sparksql_config]
|
||||
SPARK_SQL_JOB = {
|
||||
"reference": {"project_id": PROJECT_ID},
|
||||
"placement": {"cluster_name": CLUSTER_NAME},
|
||||
"spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
|
||||
}
|
||||
# [END how_to_cloud_dataproc_sparksql_config]
|
||||
|
||||
# [START how_to_cloud_dataproc_spark_config]
|
||||
SPARK_JOB = {
|
||||
"reference": {"project_id": PROJECT_ID},
|
||||
"placement": {"cluster_name": CLUSTER_NAME},
|
||||
|
@ -98,25 +107,33 @@ SPARK_JOB = {
|
|||
"main_class": "org.apache.spark.examples.SparkPi",
|
||||
},
|
||||
}
|
||||
# [END how_to_cloud_dataproc_spark_config]
|
||||
|
||||
# [START how_to_cloud_dataproc_pyspark_config]
|
||||
PYSPARK_JOB = {
|
||||
"reference": {"project_id": PROJECT_ID},
|
||||
"placement": {"cluster_name": CLUSTER_NAME},
|
||||
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
|
||||
}
|
||||
# [END how_to_cloud_dataproc_pyspark_config]
|
||||
|
||||
# [START how_to_cloud_dataproc_sparkr_config]
|
||||
SPARKR_JOB = {
|
||||
"reference": {"project_id": PROJECT_ID},
|
||||
"placement": {"cluster_name": CLUSTER_NAME},
|
||||
"spark_r_job": {"main_r_file_uri": SPARKR_URI},
|
||||
}
|
||||
# [END how_to_cloud_dataproc_sparkr_config]
|
||||
|
||||
# [START how_to_cloud_dataproc_hive_config]
|
||||
HIVE_JOB = {
|
||||
"reference": {"project_id": PROJECT_ID},
|
||||
"placement": {"cluster_name": CLUSTER_NAME},
|
||||
"hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
|
||||
}
|
||||
# [END how_to_cloud_dataproc_hive_config]
|
||||
|
||||
# [START how_to_cloud_dataproc_hadoop_config]
|
||||
HADOOP_JOB = {
|
||||
"reference": {"project_id": PROJECT_ID},
|
||||
"placement": {"cluster_name": CLUSTER_NAME},
|
||||
|
@ -125,16 +142,20 @@ HADOOP_JOB = {
|
|||
"args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
|
||||
},
|
||||
}
|
||||
# [END how_to_cloud_dataproc_hadoop_config]
|
||||
|
||||
with models.DAG(
|
||||
"example_gcp_dataproc",
|
||||
default_args={"start_date": days_ago(1)},
|
||||
schedule_interval=None,
|
||||
) as dag:
|
||||
# [START how_to_cloud_dataproc_create_cluster_operator]
|
||||
create_cluster = DataprocCreateClusterOperator(
|
||||
task_id="create_cluster", project_id=PROJECT_ID, cluster=CLUSTER, region=REGION
|
||||
)
|
||||
# [END how_to_cloud_dataproc_create_cluster_operator]
|
||||
|
||||
# [START how_to_cloud_dataproc_update_cluster_operator]
|
||||
scale_cluster = DataprocUpdateClusterOperator(
|
||||
task_id="scale_cluster",
|
||||
cluster_name=CLUSTER_NAME,
|
||||
|
@ -144,11 +165,11 @@ with models.DAG(
|
|||
project_id=PROJECT_ID,
|
||||
location=REGION,
|
||||
)
|
||||
# [END how_to_cloud_dataproc_update_cluster_operator]
|
||||
|
||||
pig_task = DataprocSubmitJobOperator(
|
||||
task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID
|
||||
)
|
||||
|
||||
spark_sql_task = DataprocSubmitJobOperator(
|
||||
task_id="spark_sql_task",
|
||||
job=SPARK_SQL_JOB,
|
||||
|
@ -160,9 +181,11 @@ with models.DAG(
|
|||
task_id="spark_task", job=SPARK_JOB, location=REGION, project_id=PROJECT_ID
|
||||
)
|
||||
|
||||
# [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
|
||||
pyspark_task = DataprocSubmitJobOperator(
|
||||
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
|
||||
)
|
||||
# [END how_to_cloud_dataproc_submit_job_to_cluster_operator]
|
||||
|
||||
sparkr_task = DataprocSubmitJobOperator(
|
||||
task_id="sparkr_task", job=SPARKR_JOB, location=REGION, project_id=PROJECT_ID
|
||||
|
@ -176,12 +199,14 @@ with models.DAG(
|
|||
task_id="hadoop_task", job=HADOOP_JOB, location=REGION, project_id=PROJECT_ID
|
||||
)
|
||||
|
||||
# [START how_to_cloud_dataproc_delete_cluster_operator]
|
||||
delete_cluster = DataprocDeleteClusterOperator(
|
||||
task_id="delete_cluster",
|
||||
project_id=PROJECT_ID,
|
||||
cluster_name=CLUSTER_NAME,
|
||||
region=REGION,
|
||||
)
|
||||
# [END how_to_cloud_dataproc_delete_cluster_operator]
|
||||
|
||||
create_cluster >> scale_cluster
|
||||
scale_cluster >> hive_task >> delete_cluster
|
||||
|
|
|
@ -0,0 +1,188 @@
|
|||
.. Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
.. http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
.. Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
|
||||
Google Cloud Dataproc Operators
|
||||
===============================
|
||||
|
||||
Dataproc is a managed Apache Spark and Apache Hadoop service that lets you
|
||||
take advantage of open source data tools for batch processing, querying, streaming and machine learning.
|
||||
Dataproc automation helps you create clusters quickly, manage them easily, and
|
||||
save money by turning clusters off when you don't need them.
|
||||
|
||||
For more information about the service visit `Dataproc production documentation <Product documentation <https://cloud.google.com/dataproc/docs/reference>`__
|
||||
|
||||
.. contents::
|
||||
:depth: 1
|
||||
:local:
|
||||
|
||||
Prerequisite Tasks
|
||||
------------------
|
||||
|
||||
.. include:: _partials/prerequisite_tasks.rst
|
||||
|
||||
|
||||
.. _howto/operator:DataprocCreateClusterOperator:
|
||||
|
||||
Create a Cluster
|
||||
----------------
|
||||
|
||||
Before you create a dataproc cluster you need to define the cluster.
|
||||
It describes the identifying information, config, and status of a cluster of Compute Engine instances.
|
||||
For more information about the available fields to pass when creating a cluster, visit `Dataproc create cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster>`__
|
||||
|
||||
A cluster configuration can look as followed:
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 0
|
||||
:start-after: [START how_to_cloud_dataproc_create_cluster]
|
||||
:end-before: [END how_to_cloud_dataproc_create_cluster]
|
||||
|
||||
With this configuration we can create the cluster:
|
||||
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator`
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 4
|
||||
:start-after: [START how_to_cloud_dataproc_create_cluster_operator]
|
||||
:end-before: [END how_to_cloud_dataproc_create_cluster_operator]
|
||||
|
||||
Update a cluster
|
||||
----------------
|
||||
You can scale the cluster up or down by providing a cluster config and a updateMask.
|
||||
In the updateMask argument you specifies the path, relative to Cluster, of the field to update.
|
||||
For more information on updateMask and other parameters take a look at `Dataproc update cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/patch>`__
|
||||
|
||||
An example of a new cluster config and the updateMask:
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 0
|
||||
:start-after: [START how_to_cloud_dataproc_updatemask_cluster_operator]
|
||||
:end-before: [END how_to_cloud_dataproc_updatemask_cluster_operator]
|
||||
|
||||
To update a cluster you can use:
|
||||
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocUpdateClusterOperator`
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 4
|
||||
:start-after: [START how_to_cloud_dataproc_update_cluster_operator]
|
||||
:end-before: [END how_to_cloud_dataproc_update_cluster_operator]
|
||||
|
||||
Deleting a cluster
|
||||
------------------
|
||||
|
||||
To delete a cluster you can use:
|
||||
|
||||
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocDeleteClusterOperator`.
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 4
|
||||
:start-after: [START how_to_cloud_dataproc_delete_cluster_operator]
|
||||
:end-before: [END how_to_cloud_dataproc_delete_cluster_operator]
|
||||
|
||||
Submit a job to a cluster
|
||||
-------------------------
|
||||
|
||||
Dataproc supports submitting jobs of different big data components.
|
||||
The list currently includes Spark, Hadoop, Pig and Hive.
|
||||
For more information on versions and images take a look at `Cloud Dataproc Image version list <https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions>`__
|
||||
|
||||
To submit a job to the cluster you need a provide a job source file. The job source file can be on GCS, the cluster or on your local
|
||||
file system. You can specify a file:/// path to refer to a local file on a cluster's master node.
|
||||
|
||||
The job configuration can be submitted by using:
|
||||
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`.
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 4
|
||||
:start-after: [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
|
||||
:end-before: [END how_to_cloud_dataproc_submit_job_to_cluster_operator]
|
||||
|
||||
Examples of job configurations to submit
|
||||
----------------------------------------
|
||||
|
||||
We have provided an example for every framework below.
|
||||
There are more arguments to provide in the jobs than the examples show. For the complete list of arguments take a look at
|
||||
`DataProc Job arguments <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs>`__
|
||||
|
||||
Example of the configuration for a PySpark Job:
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 0
|
||||
:start-after: [START how_to_cloud_dataproc_pyspark_config]
|
||||
:end-before: [END how_to_cloud_dataproc_pyspark_config]
|
||||
|
||||
Example of the configuration for a SparkSQl Job:
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 0
|
||||
:start-after: [START how_to_cloud_dataproc_sparksql_config]
|
||||
:end-before: [END how_to_cloud_dataproc_sparksql_config]
|
||||
|
||||
Example of the configuration for a Spark Job:
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 0
|
||||
:start-after: [START how_to_cloud_dataproc_spark_config]
|
||||
:end-before: [END how_to_cloud_dataproc_spark_config]
|
||||
|
||||
Example of the configuration for a Hive Job:
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 0
|
||||
:start-after: [START how_to_cloud_dataproc_hive_config]
|
||||
:end-before: [END how_to_cloud_dataproc_hive_config]
|
||||
|
||||
Example of the configuration for a Hadoop Job:
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 0
|
||||
:start-after: [START how_to_cloud_dataproc_hadoop_config]
|
||||
:end-before: [END how_to_cloud_dataproc_hadoop_config]
|
||||
|
||||
Example of the configuration for a Pig Job:
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 0
|
||||
:start-after: [START how_to_cloud_dataproc_pig_config]
|
||||
:end-before: [END how_to_cloud_dataproc_pig_config]
|
||||
|
||||
|
||||
Example of the configuration for a SparkR:
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
|
||||
:language: python
|
||||
:dedent: 0
|
||||
:start-after: [START how_to_cloud_dataproc_sparkr_config]
|
||||
:end-before: [END how_to_cloud_dataproc_sparkr_config]
|
||||
|
||||
References
|
||||
^^^^^^^^^^
|
||||
For further information, take a look at:
|
||||
|
||||
* `DataProc API documentation <https://cloud.google.com/dataproc/docs/reference>`__
|
||||
* `Product documentation <https://cloud.google.com/dataproc/docs/reference>`__
|
|
@ -697,7 +697,7 @@ These integrations allow you to perform various operations within the Google Clo
|
|||
-
|
||||
|
||||
* - `Dataproc <https://cloud.google.com/dataproc/>`__
|
||||
-
|
||||
- :doc:`How to use <howto/operator/gcp/dataproc>`
|
||||
- :mod:`airflow.providers.google.cloud.hooks.dataproc`
|
||||
- :mod:`airflow.providers.google.cloud.operators.dataproc`
|
||||
-
|
||||
|
|
|
@ -144,7 +144,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
|
|||
'bigquery_to_mysql',
|
||||
'cassandra_to_gcs',
|
||||
'dataflow',
|
||||
'dataproc',
|
||||
'datastore',
|
||||
'dlp',
|
||||
'gcs_to_bigquery',
|
||||
|
|
Загрузка…
Ссылка в новой задаче