Split and improve BigQuery example DAG (#8529)

* Simplify existing example DAG and add consistent UI colors

* Move BigQueryConsoleLink to top

* Split DAG into two separate

* Add transfer example

* Fix check interval operator

Could not cast literal {{ macros.ds_add(ds, -1) }} to type DATE at [1:51]'}], 'state': 'DONE'}}

* Add BigQuery queries example

* fixup! Add BigQuery queries example
This commit is contained in:
Tomek Urbaszek 2020-04-28 14:29:09 +02:00 коммит произвёл GitHub
Родитель c1fb28230f
Коммит 992a24ce41
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 749 добавлений и 415 удалений

Просмотреть файл

@ -1,341 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google BigQuery service.
"""
import os
import time
from urllib.parse import urlparse
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator, BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator,
BigQueryCreateExternalTableOperator, BigQueryDeleteDatasetOperator, BigQueryDeleteTableOperator,
BigQueryExecuteQueryOperator, BigQueryGetDataOperator, BigQueryGetDatasetOperator,
BigQueryGetDatasetTablesOperator, BigQueryIntervalCheckOperator, BigQueryPatchDatasetOperator,
BigQueryUpdateDatasetOperator, BigQueryUpsertTableOperator, BigQueryValueCheckOperator,
)
from airflow.providers.google.cloud.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator
from airflow.providers.google.cloud.operators.bigquery_to_gcs import BigQueryToGCSOperator
from airflow.utils.dates import days_ago
# 0x06012c8cf97BEaD5deAe237070F9587f8E7A266d = CryptoKitties contract address
WALLET_ADDRESS = os.environ.get("GCP_ETH_WALLET_ADDRESS", "0x06012c8cf97BEaD5deAe237070F9587f8E7A266d")
default_args = {"start_date": days_ago(1)}
# [START howto_operator_bigquery_query]
MOST_VALUABLE_INCOMING_TRANSACTIONS = """
SELECT
value, to_address
FROM
`bigquery-public-data.ethereum_blockchain.transactions`
WHERE
1 = 1
AND DATE(block_timestamp) = "{{ ds }}"
AND to_address = LOWER(@to_address)
ORDER BY value DESC
LIMIT 1000
"""
# [END howto_operator_bigquery_query]
MOST_ACTIVE_PLAYERS = """
SELECT
COUNT(from_address)
, from_address
FROM
`bigquery-public-data.ethereum_blockchain.transactions`
WHERE
1 = 1
AND DATE(block_timestamp) = "{{ ds }}"
AND to_address = LOWER(@to_address)
GROUP BY from_address
ORDER BY COUNT(from_address) DESC
LIMIT 1000
"""
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
BQ_LOCATION = "europe-north1"
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset")
LOCATION_DATASET_NAME = "{}_location".format(DATASET_NAME)
DATA_SAMPLE_GCS_URL = os.environ.get(
"GCP_BIGQUERY_DATA_GCS_URL", "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
)
DATA_SAMPLE_GCS_URL_PARTS = urlparse(DATA_SAMPLE_GCS_URL)
DATA_SAMPLE_GCS_BUCKET_NAME = DATA_SAMPLE_GCS_URL_PARTS.netloc
DATA_SAMPLE_GCS_OBJECT_NAME = DATA_SAMPLE_GCS_URL_PARTS.path[1:]
DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME", "test-bigquery-sample-data")
with models.DAG(
"example_bigquery",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag:
# [START howto_operator_bigquery_execute_query]
execute_query = BigQueryExecuteQueryOperator(
task_id="execute_query",
sql=MOST_VALUABLE_INCOMING_TRANSACTIONS,
use_legacy_sql=False,
query_params=[
{
"name": "to_address",
"parameterType": {"type": "STRING"},
"parameterValue": {"value": WALLET_ADDRESS},
}
],
)
# [END howto_operator_bigquery_execute_query]
# [START howto_operator_bigquery_execute_query_list]
bigquery_execute_multi_query = BigQueryExecuteQueryOperator(
task_id="execute_multi_query",
sql=[MOST_VALUABLE_INCOMING_TRANSACTIONS, MOST_ACTIVE_PLAYERS],
use_legacy_sql=False,
query_params=[
{
"name": "to_address",
"parameterType": {"type": "STRING"},
"parameterValue": {"value": WALLET_ADDRESS},
}
],
)
# [END howto_operator_bigquery_execute_query_list]
# [START howto_operator_bigquery_execute_query_save]
execute_query_save = BigQueryExecuteQueryOperator(
task_id="execute_query_save",
sql=MOST_VALUABLE_INCOMING_TRANSACTIONS,
use_legacy_sql=False,
destination_dataset_table="{}.save_query_result".format(DATASET_NAME),
query_params=[
{
"name": "to_address",
"parameterType": {"type": "STRING"},
"parameterValue": {"value": WALLET_ADDRESS},
}
],
)
# [END howto_operator_bigquery_execute_query_save]
# [START howto_operator_bigquery_get_data]
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id="save_query_result",
max_results="10",
selected_fields="value,to_address",
)
# [END howto_operator_bigquery_get_data]
get_data_result = BashOperator(
task_id="get_data_result", bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\""
)
# [START howto_operator_bigquery_create_external_table]
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
destination_project_dataset_table="{}.external_table".format(DATASET_NAME),
skip_leading_rows=1,
schema_fields=[{"name": "name", "type": "STRING"}, {"name": "post_abbr", "type": "STRING"}],
)
# [END howto_operator_bigquery_create_external_table]
execute_query_external_table = BigQueryExecuteQueryOperator(
task_id="execute_query_external_table",
destination_dataset_table="{}.selected_data_from_external_table".format(DATASET_NAME),
sql='SELECT * FROM `{}.external_table` WHERE name LIKE "W%"'.format(DATASET_NAME),
use_legacy_sql=False,
)
copy_from_selected_data = BigQueryToBigQueryOperator(
task_id="copy_from_selected_data",
source_project_dataset_tables="{}.selected_data_from_external_table".format(DATASET_NAME),
destination_project_dataset_table="{}.copy_of_selected_data_from_external_table".format(DATASET_NAME),
)
bigquery_to_gcs = BigQueryToGCSOperator(
task_id="bigquery_to_gcs",
source_project_dataset_table="{}.selected_data_from_external_table".format(DATASET_NAME),
destination_cloud_storage_uris=["gs://{}/export-bigquery.csv".format(DATA_EXPORT_BUCKET_NAME)],
)
# [START howto_operator_bigquery_create_dataset]
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)
# [END howto_operator_bigquery_create_dataset]
create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset_with_location",
dataset_id=LOCATION_DATASET_NAME,
location=BQ_LOCATION
)
# [START howto_operator_bigquery_create_table]
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
# [END howto_operator_bigquery_create_table]
create_table_with_location = BigQueryCreateEmptyTableOperator(
task_id="create_table_with_location",
dataset_id=LOCATION_DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
# [START howto_operator_bigquery_create_view]
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
view={
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False
}
)
# [END howto_operator_bigquery_create_view]
get_empty_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_empty_dataset_tables",
dataset_id=DATASET_NAME
)
# [START howto_operator_bigquery_get_dataset_tables]
get_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_dataset_tables",
dataset_id=DATASET_NAME
)
# [END howto_operator_bigquery_get_dataset_tables]
# [START howto_operator_bigquery_delete_view]
delete_view = BigQueryDeleteTableOperator(
task_id="delete_view", deletion_dataset_table="{}.test_view".format(DATASET_NAME)
)
# [END howto_operator_bigquery_delete_view]
# [START howto_operator_bigquery_delete_table]
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table", deletion_dataset_table="{}.test_table".format(DATASET_NAME)
)
# [END howto_operator_bigquery_delete_table]
# [START howto_operator_bigquery_get_dataset]
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
# [END howto_operator_bigquery_get_dataset]
get_dataset_result = BashOperator(
task_id="get_dataset_result",
bash_command="echo \"{{ task_instance.xcom_pull('get-dataset')['id'] }}\"",
)
# [START howto_operator_bigquery_patch_dataset]
patch_dataset = BigQueryPatchDatasetOperator(
task_id="patch_dataset",
dataset_id=DATASET_NAME,
dataset_resource={"friendlyName": "Patched Dataset", "description": "Patched dataset"},
)
# [END howto_operator_bigquery_patch_dataset]
# [START howto_operator_bigquery_update_dataset]
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset", dataset_id=DATASET_NAME, dataset_resource={"description": "Updated dataset"}
)
# [END howto_operator_bigquery_update_dataset]
# [START howto_operator_bigquery_delete_dataset]
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
# [END howto_operator_bigquery_delete_dataset]
delete_dataset_with_location = BigQueryDeleteDatasetOperator(
task_id="delete_dataset_with_location",
dataset_id=LOCATION_DATASET_NAME,
delete_contents=True
)
# [START howto_operator_bigquery_upsert_table]
update_table = BigQueryUpsertTableOperator(
task_id="update_table", dataset_id=DATASET_NAME, table_resource={
"tableReference": {
"tableId": "test_table_id"
},
"expirationTime": (int(time.time()) + 300) * 1000
}
)
# [END howto_operator_bigquery_upsert_table]
# [START howto_operator_bigquery_check]
check_count = BigQueryCheckOperator(
task_id="check_count",
sql="SELECT COUNT(*) FROM {}.save_query_result".format(DATASET_NAME),
use_legacy_sql=False,
)
# [END howto_operator_bigquery_check]
# [START howto_operator_bigquery_value_check]
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql="SELECT COUNT(*) FROM {}.save_query_result".format(DATASET_NAME),
pass_value=1000,
use_legacy_sql=False,
)
# [END howto_operator_bigquery_value_check]
# [START howto_operator_bigquery_interval_check]
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table="{}.save_query_result".format(DATASET_NAME),
days_back=1,
metrics_thresholds={'COUNT(*)': 1.5},
use_legacy_sql=False,
)
# [END howto_operator_bigquery_interval_check]
create_dataset >> execute_query_save >> delete_dataset
create_dataset >> get_empty_dataset_tables >> create_table >> get_dataset_tables >> delete_dataset
create_dataset >> get_dataset >> delete_dataset
create_dataset >> patch_dataset >> update_dataset >> delete_dataset
execute_query_save >> get_data >> get_dataset_result
get_data >> delete_dataset
create_dataset >> create_external_table >> execute_query_external_table >> \
copy_from_selected_data >> delete_dataset
execute_query_external_table >> bigquery_to_gcs >> delete_dataset
create_table >> create_view >> delete_view >> delete_table >> delete_dataset
create_dataset_with_location >> create_table_with_location >> delete_dataset_with_location
create_dataset >> create_table >> update_table >> delete_table >> delete_dataset
create_dataset >> execute_query_save >> check_count >> check_value >> \
check_interval >> delete_dataset

Просмотреть файл

@ -0,0 +1,204 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google BigQuery service.
"""
import os
import time
from urllib.parse import urlparse
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryCreateExternalTableOperator,
BigQueryDeleteDatasetOperator, BigQueryDeleteTableOperator, BigQueryGetDatasetOperator,
BigQueryGetDatasetTablesOperator, BigQueryPatchDatasetOperator, BigQueryUpdateDatasetOperator,
BigQueryUpsertTableOperator,
)
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
BQ_LOCATION = "europe-north1"
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset")
LOCATION_DATASET_NAME = "{}_location".format(DATASET_NAME)
DATA_SAMPLE_GCS_URL = os.environ.get(
"GCP_BIGQUERY_DATA_GCS_URL",
"gs://cloud-samples-data/bigquery/us-states/us-states.csv",
)
DATA_SAMPLE_GCS_URL_PARTS = urlparse(DATA_SAMPLE_GCS_URL)
DATA_SAMPLE_GCS_BUCKET_NAME = DATA_SAMPLE_GCS_URL_PARTS.netloc
DATA_SAMPLE_GCS_OBJECT_NAME = DATA_SAMPLE_GCS_URL_PARTS.path[1:]
with models.DAG(
"example_bigquery_operations",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=["example"],
) as dag:
# [START howto_operator_bigquery_create_table]
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
# [END howto_operator_bigquery_create_table]
# [START howto_operator_bigquery_delete_table]
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
deletion_dataset_table="{}.test_table".format(DATASET_NAME),
)
# [END howto_operator_bigquery_delete_table]
# [START howto_operator_bigquery_create_view]
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
view={
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False,
},
)
# [END howto_operator_bigquery_create_view]
# [START howto_operator_bigquery_delete_view]
delete_view = BigQueryDeleteTableOperator(
task_id="delete_view", deletion_dataset_table=f"{DATASET_NAME}.test_view"
)
# [END howto_operator_bigquery_delete_view]
# [START howto_operator_bigquery_create_external_table]
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
destination_project_dataset_table=f"{DATASET_NAME}.external_table",
skip_leading_rows=1,
schema_fields=[
{"name": "name", "type": "STRING"},
{"name": "post_abbr", "type": "STRING"},
],
)
# [END howto_operator_bigquery_create_external_table]
# [START howto_operator_bigquery_upsert_table]
update_table = BigQueryUpsertTableOperator(
task_id="update_table",
dataset_id=DATASET_NAME,
table_resource={
"tableReference": {"tableId": "test_table_id"},
"expirationTime": (int(time.time()) + 300) * 1000,
},
)
# [END howto_operator_bigquery_upsert_table]
# [START howto_operator_bigquery_create_dataset]
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create-dataset", dataset_id=DATASET_NAME
)
# [END howto_operator_bigquery_create_dataset]
# [START howto_operator_bigquery_get_dataset_tables]
get_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_dataset_tables", dataset_id=DATASET_NAME
)
# [END howto_operator_bigquery_get_dataset_tables]
# [START howto_operator_bigquery_get_dataset]
get_dataset = BigQueryGetDatasetOperator(
task_id="get-dataset", dataset_id=DATASET_NAME
)
# [END howto_operator_bigquery_get_dataset]
get_dataset_result = BashOperator(
task_id="get_dataset_result",
bash_command="echo \"{{ task_instance.xcom_pull('get-dataset')['id'] }}\"",
)
# [START howto_operator_bigquery_patch_dataset]
patch_dataset = BigQueryPatchDatasetOperator(
task_id="patch_dataset",
dataset_id=DATASET_NAME,
dataset_resource={
"friendlyName": "Patched Dataset",
"description": "Patched dataset",
},
)
# [END howto_operator_bigquery_patch_dataset]
# [START howto_operator_bigquery_update_dataset]
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset",
dataset_id=DATASET_NAME,
dataset_resource={"description": "Updated dataset"},
)
# [END howto_operator_bigquery_update_dataset]
# [START howto_operator_bigquery_delete_dataset]
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
# [END howto_operator_bigquery_delete_dataset]
create_dataset >> patch_dataset >> update_dataset >> get_dataset >> get_dataset_result >> delete_dataset
update_dataset >> create_table >> create_view >> [
get_dataset_tables,
delete_view,
] >> update_table >> delete_table >> delete_dataset
update_dataset >> create_external_table >> delete_dataset
with models.DAG(
"example_bigquery_operations_location",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=["example"],
):
create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset_with_location",
dataset_id=LOCATION_DATASET_NAME,
location=BQ_LOCATION,
)
create_table_with_location = BigQueryCreateEmptyTableOperator(
task_id="create_table_with_location",
dataset_id=LOCATION_DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
delete_dataset_with_location = BigQueryDeleteDatasetOperator(
task_id="delete_dataset_with_location",
dataset_id=LOCATION_DATASET_NAME,
delete_contents=True,
)
create_dataset_with_location >> create_table_with_location >> delete_dataset_with_location

Просмотреть файл

@ -0,0 +1,159 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google BigQuery service.
"""
import os
from datetime import datetime
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator, BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator,
BigQueryDeleteDatasetOperator, BigQueryExecuteQueryOperator, BigQueryGetDataOperator,
BigQueryIntervalCheckOperator, BigQueryValueCheckOperator,
)
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset")
TABLE_1 = "table1"
TABLE_2 = "table2"
INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
# [START howto_operator_bigquery_query]
INSERT_ROWS_QUERY = f"""
INSERT INTO {DATASET_NAME}.{TABLE_1} VALUES (42, "monthy python", "{INSERT_DATE}");
INSERT INTO {DATASET_NAME}.{TABLE_1} VALUES (42, "fishy fish", "{INSERT_DATE}");
"""
# [END howto_operator_bigquery_query]
SCHEMA = [
{"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "ds", "type": "DATE", "mode": "NULLABLE"},
]
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_bigquery_queries",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=["example"],
) as dag:
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create-dataset", dataset_id=DATASET_NAME
)
create_table_1 = BigQueryCreateEmptyTableOperator(
task_id="create_table_1",
dataset_id=DATASET_NAME,
table_id=TABLE_1,
schema_fields=SCHEMA,
)
create_table_2 = BigQueryCreateEmptyTableOperator(
task_id="create_table_2",
dataset_id=DATASET_NAME,
table_id=TABLE_2,
schema_fields=SCHEMA,
)
create_dataset >> [create_table_1, create_table_2]
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
# [START howto_operator_bigquery_execute_query]
execute_insert_query = BigQueryExecuteQueryOperator(
task_id="execute_insert_query", sql=INSERT_ROWS_QUERY, use_legacy_sql=False
)
# [END howto_operator_bigquery_execute_query]
# [START howto_operator_bigquery_execute_query_list]
bigquery_execute_multi_query = BigQueryExecuteQueryOperator(
task_id="execute_multi_query",
sql=[
f"SELECT * FROM {DATASET_NAME}.{TABLE_2}",
f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_2}",
],
use_legacy_sql=False,
)
# [END howto_operator_bigquery_execute_query_list]
# [START howto_operator_bigquery_execute_query_save]
execute_query_save = BigQueryExecuteQueryOperator(
task_id="execute_query_save",
sql=f"SELECT * FROM {DATASET_NAME}.{TABLE_1}",
use_legacy_sql=False,
destination_dataset_table=f"{DATASET_NAME}.{TABLE_2}",
)
# [END howto_operator_bigquery_execute_query_save]
# [START howto_operator_bigquery_get_data]
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_1,
max_results="10",
selected_fields="value,name",
)
# [END howto_operator_bigquery_get_data]
get_data_result = BashOperator(
task_id="get_data_result",
bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\"",
)
# [START howto_operator_bigquery_check]
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
use_legacy_sql=False,
)
# [END howto_operator_bigquery_check]
# [START howto_operator_bigquery_value_check]
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
pass_value=2,
use_legacy_sql=False,
)
# [END howto_operator_bigquery_value_check]
# [START howto_operator_bigquery_interval_check]
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET_NAME}.{TABLE_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
)
# [END howto_operator_bigquery_interval_check]
[create_table_1, create_table_2] >> execute_insert_query
execute_insert_query >> get_data >> get_data_result >> delete_dataset
execute_insert_query >> execute_query_save >> bigquery_execute_multi_query >> delete_dataset
execute_insert_query >> [check_count, check_value, check_interval] >> delete_dataset

Просмотреть файл

@ -0,0 +1,70 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google BigQuery service.
"""
import os
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator,
)
from airflow.providers.google.cloud.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfer")
ORIGIN = "origin"
TARGET = "target"
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_bigquery_to_bigquery",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=["example"],
) as dag:
copy_selected_data = BigQueryToBigQueryOperator(
task_id="copy_selected_data",
source_project_dataset_tables=f"{DATASET_NAME}.{ORIGIN}",
destination_project_dataset_table=f"{DATASET_NAME}.{TARGET}",
)
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset", dataset_id=DATASET_NAME
)
for table in [ORIGIN, TARGET]:
create_table = BigQueryCreateEmptyTableOperator(
task_id=f"create_{table}_table",
dataset_id=DATASET_NAME,
table_id=table,
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
create_dataset >> create_table >> copy_selected_data
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
copy_selected_data >> delete_dataset

Просмотреть файл

@ -0,0 +1,73 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google BigQuery service.
"""
import os
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator,
)
from airflow.providers.google.cloud.operators.bigquery_to_gcs import BigQueryToGCSOperator
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfer")
DATA_EXPORT_BUCKET_NAME = os.environ.get(
"GCP_BIGQUERY_EXPORT_BUCKET_NAME", "test-bigquery-gcs-data"
)
TABLE = "table_42"
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_bigquery_to_gcs",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=["example"],
) as dag:
bigquery_to_gcs = BigQueryToGCSOperator(
task_id="bigquery_to_gcs",
source_project_dataset_table=f"{DATASET_NAME}.{TABLE}",
destination_cloud_storage_uris=[
f"gs://{DATA_EXPORT_BUCKET_NAME}/export-bigquery.csv"
],
)
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset", dataset_id=DATASET_NAME
)
create_table = BigQueryCreateEmptyTableOperator(
task_id=f"create_table",
dataset_id=DATASET_NAME,
table_id=TABLE,
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
create_dataset >> create_table >> bigquery_to_gcs
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
bigquery_to_gcs >> delete_dataset

Просмотреть файл

@ -0,0 +1,82 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google BigQuery service.
"""
import os
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator,
)
from airflow.providers.google.cloud.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator
from airflow.providers.google.cloud.operators.bigquery_to_gcs import BigQueryToGCSOperator
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfer")
DATA_EXPORT_BUCKET_NAME = os.environ.get(
"GCP_BIGQUERY_EXPORT_BUCKET_NAME", "test-bigquery-sample-data"
)
ORIGIN = "origin"
TARGET = "target"
default_args = {"start_date": days_ago(1)}
with models.DAG(
"example_bigquery_transfer",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=["example"],
) as dag:
copy_selected_data = BigQueryToBigQueryOperator(
task_id="copy_selected_data",
source_project_dataset_tables=f"{DATASET_NAME}.{ORIGIN}",
destination_project_dataset_table=f"{DATASET_NAME}.{TARGET}",
)
bigquery_to_gcs = BigQueryToGCSOperator(
task_id="bigquery_to_gcs",
source_project_dataset_table=f"{DATASET_NAME}.{ORIGIN}",
destination_cloud_storage_uris=[
f"gs://{DATA_EXPORT_BUCKET_NAME}/export-bigquery.csv"
],
)
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset", dataset_id=DATASET_NAME
)
for table in [ORIGIN, TARGET]:
create_table = BigQueryCreateEmptyTableOperator(
task_id=f"create_{table}_table",
dataset_id=DATASET_NAME,
table_id=table,
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
create_dataset >> create_table >> [copy_selected_data, bigquery_to_gcs]
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
[copy_selected_data, bigquery_to_gcs] >> delete_dataset

Просмотреть файл

@ -20,7 +20,7 @@
"""
This module contains Google BigQuery operators.
"""
import enum
import json
import warnings
from typing import Any, Dict, Iterable, List, Optional, SupportsAbs, Union
@ -39,6 +39,49 @@ from airflow.utils.decorators import apply_defaults
BIGQUERY_JOB_DETAILS_LINK_FMT = 'https://console.cloud.google.com/bigquery?j={job_id}'
class BigQueryUIColors(enum.Enum):
"""Hex colors for BigQuery operators"""
CHECK = "#C0D7FF"
QUERY = "#A1BBFF"
TABLE = "#81A0FF"
DATASET = "#5F86FF"
class BigQueryConsoleLink(BaseOperatorLink):
"""
Helper class for constructing BigQuery link.
"""
name = 'BigQuery Console'
def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
job_id = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ''
@attr.s(auto_attribs=True)
class BigQueryConsoleIndexableLink(BaseOperatorLink):
"""
Helper class for constructing BigQuery link.
"""
index: int = attr.ib()
@property
def name(self) -> str:
return 'BigQuery Console #{index}'.format(index=self.index + 1)
def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
job_ids = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
if not job_ids:
return None
if len(job_ids) < self.index:
return None
job_id = job_ids[self.index]
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
class BigQueryCheckOperator(CheckOperator):
"""
Performs checks against BigQuery. The ``BigQueryCheckOperator`` expects
@ -85,6 +128,7 @@ class BigQueryCheckOperator(CheckOperator):
template_fields = ('sql', 'gcp_conn_id',)
template_ext = ('.sql',)
ui_color = BigQueryUIColors.CHECK.value
@apply_defaults
def __init__(self,
@ -130,6 +174,7 @@ class BigQueryValueCheckOperator(ValueCheckOperator):
template_fields = ('sql', 'gcp_conn_id', 'pass_value',)
template_ext = ('.sql',)
ui_color = BigQueryUIColors.CHECK.value
@apply_defaults
def __init__(self, sql: str,
@ -186,7 +231,8 @@ class BigQueryIntervalCheckOperator(IntervalCheckOperator):
:type bigquery_conn_id: str
"""
template_fields = ('table', 'gcp_conn_id',)
template_fields = ('table', 'gcp_conn_id', 'sql1', 'sql2')
ui_color = BigQueryUIColors.CHECK.value
@apply_defaults
def __init__(self,
@ -269,7 +315,7 @@ class BigQueryGetDataOperator(BaseOperator):
:type location: str
"""
template_fields = ('dataset_id', 'table_id', 'max_results')
ui_color = '#e4f0e8'
ui_color = BigQueryUIColors.QUERY.value
@apply_defaults
def __init__(self,
@ -330,41 +376,6 @@ class BigQueryGetDataOperator(BaseOperator):
return table_data
class BigQueryConsoleLink(BaseOperatorLink):
"""
Helper class for constructing BigQuery link.
"""
name = 'BigQuery Console'
def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
job_id = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ''
@attr.s(auto_attribs=True)
class BigQueryConsoleIndexableLink(BaseOperatorLink):
"""
Helper class for constructing BigQuery link.
"""
index: int = attr.ib()
@property
def name(self) -> str:
return 'BigQuery Console #{index}'.format(index=self.index + 1)
def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
job_ids = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
if not job_ids:
return None
if len(job_ids) < self.index:
return None
job_id = job_ids[self.index]
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
# pylint: disable=too-many-instance-attributes
class BigQueryExecuteQueryOperator(BaseOperator):
"""
@ -461,7 +472,7 @@ class BigQueryExecuteQueryOperator(BaseOperator):
template_fields = ('sql', 'destination_dataset_table', 'labels', 'query_params')
template_ext = ('.sql', )
ui_color = '#e4f0e8'
ui_color = BigQueryUIColors.QUERY.value
@property
def operator_extra_links(self):
@ -710,7 +721,7 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
"""
template_fields = ('dataset_id', 'table_id', 'project_id',
'gcs_schema_object', 'labels', 'view')
ui_color = '#f0eee4'
ui_color = BigQueryUIColors.TABLE.value
# pylint: disable=too-many-arguments
@apply_defaults
@ -875,7 +886,7 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
"""
template_fields = ('bucket', 'source_objects',
'schema_object', 'destination_project_dataset_table', 'labels')
ui_color = '#f0eee4'
ui_color = BigQueryUIColors.TABLE.value
# pylint: disable=too-many-arguments
@apply_defaults
@ -1001,7 +1012,7 @@ class BigQueryDeleteDatasetOperator(BaseOperator):
"""
template_fields = ('dataset_id', 'project_id')
ui_color = '#f00004'
ui_color = BigQueryUIColors.DATASET.value
@apply_defaults
def __init__(self,
@ -1076,7 +1087,7 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
"""
template_fields = ('dataset_id', 'project_id')
ui_color = '#f0eee4'
ui_color = BigQueryUIColors.DATASET.value
@apply_defaults
def __init__(self,
@ -1142,7 +1153,7 @@ class BigQueryGetDatasetOperator(BaseOperator):
"""
template_fields = ('dataset_id', 'project_id')
ui_color = '#f0eee4'
ui_color = BigQueryUIColors.DATASET.value
@apply_defaults
def __init__(self,
@ -1194,7 +1205,7 @@ class BigQueryGetDatasetTablesOperator(BaseOperator):
.. seealso:: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list#response-body
"""
template_fields = ('dataset_id', 'project_id')
ui_color = '#f00004'
ui_color = BigQueryUIColors.DATASET.value
@apply_defaults
def __init__(self,
@ -1247,7 +1258,7 @@ class BigQueryPatchDatasetOperator(BaseOperator):
"""
template_fields = ('dataset_id', 'project_id')
ui_color = '#f0eee4'
ui_color = BigQueryUIColors.DATASET.value
@apply_defaults
def __init__(self,
@ -1298,7 +1309,7 @@ class BigQueryUpdateDatasetOperator(BaseOperator):
"""
template_fields = ('dataset_id', 'project_id')
ui_color = '#f0eee4'
ui_color = BigQueryUIColors.DATASET.value
@apply_defaults
def __init__(self,
@ -1356,7 +1367,7 @@ class BigQueryDeleteTableOperator(BaseOperator):
:type location: str
"""
template_fields = ('deletion_dataset_table',)
ui_color = '#ffd1dc'
ui_color = BigQueryUIColors.TABLE.value
@apply_defaults
def __init__(self,
@ -1419,6 +1430,7 @@ class BigQueryUpsertTableOperator(BaseOperator):
:type location: str
"""
template_fields = ('dataset_id', 'table_resource',)
ui_color = BigQueryUIColors.TABLE.value
@apply_defaults
def __init__(self,

Просмотреть файл

@ -46,7 +46,7 @@ Create dataset
To create an empty dataset in a BigQuery database you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator`.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_create_dataset]
@ -62,7 +62,7 @@ To get the details of an existing dataset you can use
This operator returns a `Dataset Resource <https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource>`__.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_get_dataset]
@ -76,7 +76,7 @@ List tables in dataset
To retrieve the list of tables in a given dataset use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetTablesOperator`.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_get_dataset_tables]
@ -93,7 +93,7 @@ To patch a dataset in BigQuery you can use
Note, this operator only replaces fields that are provided in the submitted dataset
resource.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_patch_dataset]
@ -110,7 +110,7 @@ To update a dataset in BigQuery you can use
The update method replaces the entire dataset resource, whereas the patch
method only replaces fields that are provided in the submitted dataset resource.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_update_dataset]
@ -124,7 +124,7 @@ Delete dataset
To delete an existing dataset from a BigQuery database you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteDatasetOperator`.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_delete_dataset]
@ -147,7 +147,7 @@ ways. You may either directly pass the schema fields in, or you may point the
operator to a Google Cloud Storage object name. The object in Google Cloud
Storage must be a JSON file with the schema fields in it.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_create_table]
@ -155,7 +155,7 @@ Storage must be a JSON file with the schema fields in it.
You can use this operator to create a view on top of an existing table.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_create_view]
@ -175,7 +175,7 @@ Similarly to
you may either directly pass the schema fields in, or you may point the operator
to a Google Cloud Storage object name.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_create_external_table]
@ -196,7 +196,7 @@ returned list will be equal to the number of rows fetched. Each element in the
list will again be a list where elements would represent the column values for
that row.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_get_data]
@ -213,7 +213,7 @@ To upsert a table you can use
This operator either updates the existing table or creates a new, empty table
in the given dataset.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_upsert_table]
@ -227,7 +227,7 @@ Delete table
To delete an existing table you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteTableOperator`.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_delete_table]
@ -235,7 +235,7 @@ To delete an existing table you can use
You can also use this operator to delete a view.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_delete_view]
@ -248,7 +248,7 @@ Execute queries
Let's say you would like to execute the following query.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 0
:start-after: [START howto_operator_bigquery_query]
@ -257,7 +257,7 @@ Let's say you would like to execute the following query.
To execute the SQL query in a specific BigQuery database you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator`.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_execute_query]
@ -267,7 +267,7 @@ To execute the SQL query in a specific BigQuery database you can use
(sql statements), or reference to a template file. Template reference are recognized
by str ending in '.sql'.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_execute_query_list]
@ -276,7 +276,7 @@ by str ending in '.sql'.
You can store the results of the query in a table by specifying
``destination_dataset_table``.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_execute_query_save]
@ -297,7 +297,7 @@ This operator expects a sql query that will return a single row. Each value on
that first row is evaluated using python ``bool`` casting. If any of the values
return ``False`` the check is failed and errors out.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_check]
@ -315,7 +315,7 @@ This operator expects a sql query that will return a single row. Each value on
that first row is evaluated against ``pass_value`` which can be either a string
or numeric value. If numeric, you can also specify ``tolerance``.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_value_check]
@ -330,7 +330,7 @@ To check that the values of metrics given as SQL expressions are within a certai
tolerance of the ones from ``days_back`` before you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryIntervalCheckOperator`.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery.py
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_interval_check]

Просмотреть файл

@ -18,7 +18,7 @@
"""System tests for Google Cloud Build operators"""
import pytest
from airflow.providers.google.cloud.example_dags.example_bigquery import DATA_EXPORT_BUCKET_NAME
from airflow.providers.google.cloud.example_dags.example_bigquery_transfer import DATA_EXPORT_BUCKET_NAME
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
@ -29,7 +29,6 @@ from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTe
class BigQueryExampleDagsSystemTest(GoogleSystemTest):
"""
System tests for Google BigQuery operators
It use a real service.
"""
@ -39,8 +38,12 @@ class BigQueryExampleDagsSystemTest(GoogleSystemTest):
self.create_gcs_bucket(DATA_EXPORT_BUCKET_NAME)
@provide_gcp_context(GCP_BIGQUERY_KEY)
def test_run_example_dag(self):
self.run_dag('example_bigquery', CLOUD_DAG_FOLDER)
def test_run_example_dag_operations(self):
self.run_dag('example_bigquery_operations', CLOUD_DAG_FOLDER)
@provide_gcp_context(GCP_BIGQUERY_KEY)
def test_run_example_dag_queries(self):
self.run_dag('example_bigquery_queries', CLOUD_DAG_FOLDER)
@provide_gcp_context(GCP_BIGQUERY_KEY)
def tearDown(self):

Просмотреть файл

@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""System tests for Google Cloud Build operators"""
import pytest
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
@pytest.mark.backend("mysql", "postgres")
@pytest.mark.system("google.cloud")
@pytest.mark.credential_file(GCP_BIGQUERY_KEY)
class BigQueryToBigQueryExampleDagsSystemTest(GoogleSystemTest):
@provide_gcp_context(GCP_BIGQUERY_KEY)
def test_run_example_dag_queries(self):
self.run_dag('example_bigquery_to_bigquery', CLOUD_DAG_FOLDER)

Просмотреть файл

@ -0,0 +1,43 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""System tests for Google Cloud Build operators"""
import pytest
from airflow.providers.google.cloud.example_dags.example_bigquery_to_gcs import DATA_EXPORT_BUCKET_NAME
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
@pytest.mark.backend("mysql", "postgres")
@pytest.mark.system("google.cloud")
@pytest.mark.credential_file(GCP_BIGQUERY_KEY)
class BigQueryExampleDagsSystemTest(GoogleSystemTest):
@provide_gcp_context(GCP_BIGQUERY_KEY)
def setUp(self):
super().setUp()
self.create_gcs_bucket(DATA_EXPORT_BUCKET_NAME)
@provide_gcp_context(GCP_BIGQUERY_KEY)
def test_run_example_dag_queries(self):
self.run_dag('example_bigquery_to_gcs', CLOUD_DAG_FOLDER)
@provide_gcp_context(GCP_BIGQUERY_KEY)
def tearDown(self):
self.delete_gcs_bucket(DATA_EXPORT_BUCKET_NAME)
super().tearDown()

Просмотреть файл

@ -138,10 +138,8 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
('cloud', 'bigquery_to_mysql'),
('cloud', 'speech_to_text'),
('cloud', 'cassandra_to_gcs'),
('cloud', 'bigquery_to_bigquery'),
('cloud', 'mysql_to_gcs'),
('cloud', 'mssql_to_gcs'),
('cloud', 'bigquery_to_gcs'),
('cloud', 'local_to_gcs'),
('cloud', 'sheets_to_gcs'),
('suite', 'gcs_to_sheets'),