diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py index b03d380b80..b02661f4a4 100644 --- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py @@ -19,7 +19,9 @@ This module contains Google BigQuery to Google Cloud Storage operator. """ import warnings -from typing import Dict, List, Optional, Sequence, Union +from typing import Any, Dict, List, Optional, Sequence, Union + +from google.cloud.bigquery.table import TableReference from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook @@ -139,14 +141,26 @@ class BigQueryToGCSOperator(BaseOperator): location=self.location, impersonation_chain=self.impersonation_chain, ) - conn = hook.get_conn() - cursor = conn.cursor() - cursor.run_extract( - source_project_dataset_table=self.source_project_dataset_table, - destination_cloud_storage_uris=self.destination_cloud_storage_uris, - compression=self.compression, - export_format=self.export_format, - field_delimiter=self.field_delimiter, - print_header=self.print_header, - labels=self.labels, - ) + + table_ref = TableReference.from_string(self.source_project_dataset_table, hook.project_id) + + configuration: Dict[str, Any] = { + 'extract': { + 'sourceTable': table_ref.to_api_repr(), + 'compression': self.compression, + 'destinationUris': self.destination_cloud_storage_uris, + 'destinationFormat': self.export_format, + } + } + + if self.labels: + configuration['labels'] = self.labels + + if self.export_format == 'CSV': + # Only set fieldDelimiter and printHeader fields if using CSV. + # Google does not like it if you set these fields for other export + # formats. + configuration['extract']['fieldDelimiter'] = self.field_delimiter + configuration['extract']['printHeader'] = self.print_header + + hook.insert_job(configuration=configuration) diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py index 8bd53e0355..c66c3fda28 100644 --- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py @@ -25,6 +25,7 @@ from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToG TASK_ID = 'test-bq-create-table-operator' TEST_DATASET = 'test-dataset' TEST_TABLE_ID = 'test-table-id' +PROJECT_ID = 'test-project-id' class TestBigQueryToCloudStorageOperator(unittest.TestCase): @@ -38,6 +39,24 @@ class TestBigQueryToCloudStorageOperator(unittest.TestCase): print_header = True labels = {'k1': 'v1'} + mock_hook().project_id = PROJECT_ID + + configuration = { + 'extract': { + 'sourceTable': { + 'projectId': mock_hook().project_id, + 'datasetId': TEST_DATASET, + 'tableId': TEST_TABLE_ID, + }, + 'compression': compression, + 'destinationUris': destination_cloud_storage_uris, + 'destinationFormat': export_format, + 'fieldDelimiter': field_delimiter, + 'printHeader': print_header, + }, + 'labels': labels, + } + operator = BigQueryToGCSOperator( task_id=TASK_ID, source_project_dataset_table=source_project_dataset_table, @@ -50,12 +69,5 @@ class TestBigQueryToCloudStorageOperator(unittest.TestCase): ) operator.execute(None) - mock_hook.return_value.get_conn.return_value.cursor.return_value.run_extract.assert_called_once_with( - source_project_dataset_table=source_project_dataset_table, - destination_cloud_storage_uris=destination_cloud_storage_uris, - compression=compression, - export_format=export_format, - field_delimiter=field_delimiter, - print_header=print_header, - labels=labels, - ) + + mock_hook.return_value.insert_job.assert_called_once_with(configuration=configuration)