[AIRFLOW-10672] Refactor BigQueryToGCSOperator to use new method (#10773)

Makes BigQueryToGCSOperator to use BigQueryHook.insert_job method

Committer: Mateusz Kukieła <mateuszkukiela@gmail.com>
This commit is contained in:
Mateusz Kukieła 2020-09-07 16:18:16 +02:00 коммит произвёл GitHub
Родитель c8ee455685
Коммит f14f379716
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 47 добавлений и 21 удалений

Просмотреть файл

@ -19,7 +19,9 @@
This module contains Google BigQuery to Google Cloud Storage operator.
"""
import warnings
from typing import Dict, List, Optional, Sequence, Union
from typing import Any, Dict, List, Optional, Sequence, Union
from google.cloud.bigquery.table import TableReference
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
@ -139,14 +141,26 @@ class BigQueryToGCSOperator(BaseOperator):
location=self.location,
impersonation_chain=self.impersonation_chain,
)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_extract(
source_project_dataset_table=self.source_project_dataset_table,
destination_cloud_storage_uris=self.destination_cloud_storage_uris,
compression=self.compression,
export_format=self.export_format,
field_delimiter=self.field_delimiter,
print_header=self.print_header,
labels=self.labels,
)
table_ref = TableReference.from_string(self.source_project_dataset_table, hook.project_id)
configuration: Dict[str, Any] = {
'extract': {
'sourceTable': table_ref.to_api_repr(),
'compression': self.compression,
'destinationUris': self.destination_cloud_storage_uris,
'destinationFormat': self.export_format,
}
}
if self.labels:
configuration['labels'] = self.labels
if self.export_format == 'CSV':
# Only set fieldDelimiter and printHeader fields if using CSV.
# Google does not like it if you set these fields for other export
# formats.
configuration['extract']['fieldDelimiter'] = self.field_delimiter
configuration['extract']['printHeader'] = self.print_header
hook.insert_job(configuration=configuration)

Просмотреть файл

@ -25,6 +25,7 @@ from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToG
TASK_ID = 'test-bq-create-table-operator'
TEST_DATASET = 'test-dataset'
TEST_TABLE_ID = 'test-table-id'
PROJECT_ID = 'test-project-id'
class TestBigQueryToCloudStorageOperator(unittest.TestCase):
@ -38,6 +39,24 @@ class TestBigQueryToCloudStorageOperator(unittest.TestCase):
print_header = True
labels = {'k1': 'v1'}
mock_hook().project_id = PROJECT_ID
configuration = {
'extract': {
'sourceTable': {
'projectId': mock_hook().project_id,
'datasetId': TEST_DATASET,
'tableId': TEST_TABLE_ID,
},
'compression': compression,
'destinationUris': destination_cloud_storage_uris,
'destinationFormat': export_format,
'fieldDelimiter': field_delimiter,
'printHeader': print_header,
},
'labels': labels,
}
operator = BigQueryToGCSOperator(
task_id=TASK_ID,
source_project_dataset_table=source_project_dataset_table,
@ -50,12 +69,5 @@ class TestBigQueryToCloudStorageOperator(unittest.TestCase):
)
operator.execute(None)
mock_hook.return_value.get_conn.return_value.cursor.return_value.run_extract.assert_called_once_with(
source_project_dataset_table=source_project_dataset_table,
destination_cloud_storage_uris=destination_cloud_storage_uris,
compression=compression,
export_format=export_format,
field_delimiter=field_delimiter,
print_header=print_header,
labels=labels,
)
mock_hook.return_value.insert_job.assert_called_once_with(configuration=configuration)