[AIRFLOW-2513] Change `bql` to `sql` for BigQuery Hooks & Ops

- Change `bql` to `sql` for BigQuery Hooks &
Operators for consistency

Closes #3454 from kaxil/consistent-bq-lang
This commit is contained in:
Kaxil Naik 2018-06-04 10:04:03 +01:00 коммит произвёл Kaxil Naik
Родитель a47b2776f1
Коммит b220fe60d5
5 изменённых файлов: 110 добавлений и 29 удалений

Просмотреть файл

@ -63,6 +63,10 @@ Dataflow job labeling is now supported in Dataflow{Java,Python}Operator with a d
"airflow-version" label, please upgrade your google-cloud-dataflow or apache-beam version
to 2.2.0 or greater.
### BigQuery Hooks and Operator
The `bql` parameter passed to `BigQueryOperator` and `BigQueryBaseCursor.run_query` has been deprecated and renamed to `sql` for consistency purposes. Using `bql` will still work (and raise a `DeprecationWarning`), but is no longer
supported and will be removed entirely in Airflow 2.0
### Redshift to S3 Operator
With Airflow 1.9 or lower, Unload operation always included header row. In order to include header row,
we need to turn off parallel unload. It is preferred to perform unload operation using all nodes so that it is

Просмотреть файл

@ -82,7 +82,7 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
"""
raise NotImplementedError()
def get_pandas_df(self, bql, parameters=None, dialect=None):
def get_pandas_df(self, sql, parameters=None, dialect=None):
"""
Returns a Pandas DataFrame for the results produced by a BigQuery
query. The DbApiHook method must be overridden because Pandas
@ -91,8 +91,8 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447
https://github.com/pydata/pandas/issues/6900
:param bql: The BigQuery SQL to execute.
:type bql: string
:param sql: The BigQuery SQL to execute.
:type sql: string
:param parameters: The parameters to render the SQL query with (not
used, leave to override superclass method)
:type parameters: mapping or iterable
@ -103,7 +103,7 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
if dialect is None:
dialect = 'legacy' if self.use_legacy_sql else 'standard'
return read_gbq(bql,
return read_gbq(sql,
project_id=self._get_field('project'),
dialect=dialect,
verbose=False)
@ -454,7 +454,8 @@ class BigQueryBaseCursor(LoggingMixin):
)
def run_query(self,
bql,
bql=None,
sql=None,
destination_dataset_table=False,
write_disposition='WRITE_EMPTY',
allow_large_results=False,
@ -476,8 +477,11 @@ class BigQueryBaseCursor(LoggingMixin):
For more details about these parameters.
:param bql: The BigQuery SQL to execute.
:param bql: (Deprecated. Use `sql` parameter instead) The BigQuery SQL
to execute.
:type bql: string
:param sql: The BigQuery SQL to execute.
:type sql: string
:param destination_dataset_table: The dotted <dataset>.<table>
BigQuery table to save the query results.
:type destination_dataset_table: string
@ -526,6 +530,23 @@ class BigQueryBaseCursor(LoggingMixin):
"""
# TODO remove `bql` in Airflow 2.0 - Jira: [AIRFLOW-2513]
sql = bql if sql is None else sql
if bql:
import warnings
warnings.warn('Deprecated parameter `bql` used in '
'`BigQueryBaseCursor.run_query` '
'Use `sql` parameter instead to pass the sql to be '
'executed. `bql` parameter is deprecated and '
'will be removed in a future version of '
'Airflow.',
category=DeprecationWarning)
if sql is None:
raise TypeError('`BigQueryBaseCursor.run_query` missing 1 required '
'positional argument: `sql`')
# BigQuery also allows you to define how you want a table's schema to change
# as a side effect of a query job
# for more details:
@ -545,7 +566,7 @@ class BigQueryBaseCursor(LoggingMixin):
configuration = {
'query': {
'query': bql,
'query': sql,
'useLegacySql': use_legacy_sql,
'maximumBillingTier': maximum_billing_tier,
'maximumBytesBilled': maximum_bytes_billed,
@ -1277,9 +1298,9 @@ class BigQueryCursor(BigQueryBaseCursor):
:param parameters: Parameters to substitute into the query.
:type parameters: dict
"""
bql = _bind_parameters(operation,
sql = _bind_parameters(operation,
parameters) if parameters else operation
self.job_id = self.run_query(bql)
self.job_id = self.run_query(sql)
def executemany(self, operation, seq_of_parameters):
"""

Просмотреть файл

@ -29,10 +29,15 @@ class BigQueryOperator(BaseOperator):
"""
Executes BigQuery SQL queries in a specific BigQuery database
:param bql: the sql code to be executed
:param bql: (Deprecated. Use `sql` parameter instead) the sql code to be
executed (templated)
:type bql: Can receive a str representing a sql statement,
a list of str (sql statements), or reference to a template file.
Template reference are recognized by str ending in '.sql'. (templated)
Template reference are recognized by str ending in '.sql'.
:param sql: the sql code to be executed (templated)
:type sql: Can receive a str representing a sql statement,
a list of str (sql statements), or reference to a template file.
Template reference are recognized by str ending in '.sql'.
:param destination_dataset_table: A dotted
(<project>.|<project>:)<dataset>.<table> that, if set, will store the results
of the query. (templated)
@ -87,13 +92,14 @@ class BigQueryOperator(BaseOperator):
:type time_partitioning: dict
"""
template_fields = ('bql', 'destination_dataset_table')
template_fields = ('bql', 'sql', 'destination_dataset_table')
template_ext = ('.sql', )
ui_color = '#e4f0e8'
@apply_defaults
def __init__(self,
bql,
bql=None,
sql=None,
destination_dataset_table=False,
write_disposition='WRITE_EMPTY',
allow_large_results=False,
@ -113,6 +119,7 @@ class BigQueryOperator(BaseOperator):
**kwargs):
super(BigQueryOperator, self).__init__(*args, **kwargs)
self.bql = bql
self.sql = sql if sql else bql
self.destination_dataset_table = destination_dataset_table
self.write_disposition = write_disposition
self.create_disposition = create_disposition
@ -130,9 +137,23 @@ class BigQueryOperator(BaseOperator):
self.priority = priority
self.time_partitioning = time_partitioning
# TODO remove `bql` in Airflow 2.0
if self.bql:
import warnings
warnings.warn('Deprecated parameter `bql` used in Task id: {}. '
'Use `sql` parameter instead to pass the sql to be '
'executed. `bql` parameter is deprecated and '
'will be removed in a future version of '
'Airflow.'.format(self.task_id),
category=DeprecationWarning)
if self.sql is None:
raise TypeError('{} missing 1 required positional '
'argument: `sql`'.format(self.task_id))
def execute(self, context):
if self.bq_cursor is None:
self.log.info('Executing: %s', self.bql)
self.log.info('Executing: %s', self.sql)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
use_legacy_sql=self.use_legacy_sql,
@ -140,7 +161,7 @@ class BigQueryOperator(BaseOperator):
conn = hook.get_conn()
self.bq_cursor = conn.cursor()
self.bq_cursor.run_query(
self.bql,
self.sql,
destination_dataset_table=self.destination_dataset_table,
write_disposition=self.write_disposition,
allow_large_results=self.allow_large_results,

Просмотреть файл

@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -19,6 +19,8 @@
#
import unittest
import warnings
import mock
from airflow.contrib.hooks import bigquery_hook as hook
@ -33,6 +35,7 @@ try:
except HttpAccessTokenRefreshError:
bq_available = False
class TestBigQueryDataframeResults(unittest.TestCase):
def setUp(self):
self.instance = hook.BigQueryHook()
@ -67,6 +70,7 @@ class TestBigQueryDataframeResults(unittest.TestCase):
self.assertIn('pandas_gbq.gbq.GenericGBQException: Reason: invalidQuery',
str(context.exception), "")
class TestBigQueryTableSplitter(unittest.TestCase):
def test_internal_need_default_project(self):
with self.assertRaises(Exception) as context:
@ -104,16 +108,14 @@ class TestBigQueryTableSplitter(unittest.TestCase):
self.assertEqual("dataset", dataset)
self.assertEqual("table", table)
def test_valid_double_column(self):
project, dataset, table = hook._split_tablename('alt1:alt:dataset.table',
'project')
'project')
self.assertEqual('alt1:alt', project)
self.assertEqual("dataset", dataset)
self.assertEqual("table", table)
def test_invalid_syntax_triple_colon(self):
with self.assertRaises(Exception) as context:
hook._split_tablename('alt1:alt2:alt3:dataset.table',
@ -123,7 +125,6 @@ class TestBigQueryTableSplitter(unittest.TestCase):
str(context.exception), "")
self.assertFalse('Format exception for' in str(context.exception))
def test_invalid_syntax_triple_dot(self):
with self.assertRaises(Exception) as context:
hook._split_tablename('alt1.alt.dataset.table',
@ -213,9 +214,29 @@ class TestBigQueryBaseCursor(unittest.TestCase):
"test_schema.json",
["test_data.json"],
schema_update_options=["THIS IS NOT VALID"]
)
)
self.assertIn("THIS IS NOT VALID", str(context.exception))
@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
def test_bql_deprecation_warning(self, mock_rwc):
with warnings.catch_warnings(record=True) as w:
hook.BigQueryBaseCursor("test", "test").run_query(
bql='select * from test_table'
)
self.assertIn(
'Deprecated parameter `bql`',
w[0].message.args[0])
def test_nobql_nosql_param_error(self):
with self.assertRaises(TypeError) as context:
hook.BigQueryBaseCursor("test", "test").run_query(
sql=None,
bql=None
)
self.assertIn(
'missing 1 required positional',
str(context.exception))
def test_invalid_schema_update_and_write_disposition(self):
with self.assertRaises(Exception) as context:
hook.BigQueryBaseCursor("test", "test").run_load(
@ -321,7 +342,7 @@ class TestTimePartitioningInRunJob(unittest.TestCase):
mocked_rwc.side_effect = run_with_config
bq_hook = hook.BigQueryBaseCursor(mock.Mock(), project_id)
bq_hook.run_query(bql='select 1')
bq_hook.run_query(sql='select 1')
mocked_rwc.assert_called_once()
@ -344,7 +365,7 @@ class TestTimePartitioningInRunJob(unittest.TestCase):
bq_hook = hook.BigQueryBaseCursor(mock.Mock(), project_id)
bq_hook.run_query(
bql='select 1',
sql='select 1',
destination_dataset_table='my_dataset.my_table',
time_partitioning={'type': 'DAY', 'field': 'test_field', 'expirationMs': 1000}
)

Просмотреть файл

@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -18,10 +18,12 @@
# under the License.
import unittest
import warnings
from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator
from airflow.contrib.operators.bigquery_operator \
import BigQueryCreateExternalTableOperator
from airflow.contrib.operators.bigquery_operator import \
BigQueryCreateExternalTableOperator, \
BigQueryOperator, \
BigQueryCreateEmptyTableOperator
try:
from unittest import mock
@ -40,6 +42,18 @@ TEST_GCS_DATA = ['dir1/*.csv']
TEST_SOURCE_FORMAT = 'CSV'
class BigQueryOperatorTest(unittest.TestCase):
def test_bql_deprecation_warning(self):
with warnings.catch_warnings(record=True) as w:
BigQueryOperator(
task_id='test_deprecation_warning_for_bql',
bql='select * from test_table'
)
self.assertIn(
'Deprecated parameter `bql`',
w[0].message.args[0])
class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')