[AIRFLOW-2887] Added BigQueryCreateEmptyDatasetOperator and create_emty_dataset to bigquery_hook (#3876)

This commit is contained in:
Iuliia Volkova 2018-09-21 17:46:59 +03:00 коммит произвёл Kaxil Naik
Родитель d277888a04
Коммит dd85126f26
6 изменённых файлов: 195 добавлений и 13 удалений

Просмотреть файл

@ -960,7 +960,7 @@ class BigQueryBaseCursor(LoggingMixin):
if not set(allowed_schema_update_options).issuperset(
set(schema_update_options)):
raise ValueError(
"{0} contains invalid schema update options. "
"{0} contains invalid schema update options."
"Please only use one or more of the following options: {1}"
.format(schema_update_options, allowed_schema_update_options))
@ -1350,6 +1350,72 @@ class BigQueryBaseCursor(LoggingMixin):
view_project, view_dataset, view_table, source_project, source_dataset)
return source_dataset_resource
def create_empty_dataset(self, dataset_id="", project_id="",
dataset_reference=None):
"""
Create a new empty dataset:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
:param project_id: The name of the project where we want to create
an empty a dataset. Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:type dataset_id: str
:param dataset_reference: Dataset reference that could be provided
with request body. More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_reference: dict
"""
if dataset_reference:
_validate_value('dataset_reference', dataset_reference, dict)
else:
dataset_reference = {}
if "datasetReference" not in dataset_reference:
dataset_reference["datasetReference"] = {}
if not dataset_reference["datasetReference"].get("datasetId") and not dataset_id:
raise ValueError(
"{} not provided datasetId. Impossible to create dataset")
dataset_required_params = [(dataset_id, "datasetId", ""),
(project_id, "projectId", self.project_id)]
for param_tuple in dataset_required_params:
param, param_name, param_default = param_tuple
if param_name not in dataset_reference['datasetReference']:
if param_default and not param:
self.log.info("{} was not specified. Will be used default "
"value {}.".format(param_name,
param_default))
param = param_default
dataset_reference['datasetReference'].update(
{param_name: param})
elif param:
_api_resource_configs_duplication_check(
param_name, param,
dataset_reference['datasetReference'], 'dataset_reference')
dataset_id = dataset_reference.get("datasetReference").get("datasetId")
dataset_project_id = dataset_reference.get("datasetReference").get(
"projectId")
self.log.info('Creating Dataset: %s in project: %s ', dataset_id,
dataset_project_id)
try:
self.service.datasets().insert(
projectId=dataset_project_id,
body=dataset_reference).execute()
self.log.info('Dataset created successfully: In project %s '
'Dataset %s', dataset_project_id, dataset_id)
except HttpError as err:
raise AirflowException(
'BigQuery job failed. Error was: {}'.format(err.content)
)
def delete_dataset(self, project_id, dataset_id):
"""
Delete a dataset of Big query in your project.
@ -1671,10 +1737,11 @@ def _validate_value(key, value, expected_type):
key, expected_type, type(value)))
def _api_resource_configs_duplication_check(key, value, config_dict):
def _api_resource_configs_duplication_check(key, value, config_dict,
config_dict_name='api_resource_configs'):
if key in config_dict and value != config_dict[key]:
raise ValueError("Values of {param_name} param are duplicated. "
"`api_resource_configs` contained {param_name} param "
"{dict_name} contained {param_name} param "
"in `query` config and {param_name} was also provided "
"with arg to run_query() method. Please remove duplicates."
.format(param_name=key))
.format(param_name=key, dict_name=config_dict_name))

Просмотреть файл

@ -528,8 +528,7 @@ class BigQueryDeleteDatasetOperator(BaseOperator):
**Example**: ::
delete_temp_data = BigQueryDeleteDatasetOperator(
dataset_id = 'temp-dataset',
delete_temp_data = BigQueryDeleteDatasetOperator(dataset_id = 'temp-dataset',
project_id = 'temp-project',
bigquery_conn_id='_my_gcp_conn_',
task_id='Deletetemp',
@ -567,3 +566,66 @@ class BigQueryDeleteDatasetOperator(BaseOperator):
project_id=self.project_id,
dataset_id=self.dataset_id
)
class BigQueryCreateEmptyDatasetOperator(BaseOperator):
""""
This operator is used to create new dataset for your Project in Big query.
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:param project_id: The name of the project where we want to create the dataset.
Don't need to provide, if projectId in dataset_reference.
:type project_id: str
:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:type dataset_id: str
:param dataset_reference: Dataset reference that could be provided with request body.
More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
:type dataset_reference: dict
**Example**: ::
create_new_dataset = BigQueryCreateEmptyDatasetOperator(
dataset_id = 'new-dataset',
project_id = 'my-project',
dataset_reference = {"friendlyName": "New Dataset"}
bigquery_conn_id='_my_gcp_conn_',
task_id='newDatasetCreator',
dag=dag)
"""
template_fields = ('dataset_id', 'project_id')
ui_color = '#f0eee4'
@apply_defaults
def __init__(self,
dataset_id,
project_id=None,
dataset_reference=None,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args, **kwargs):
self.dataset_id = dataset_id
self.project_id = project_id
self.bigquery_conn_id = bigquery_conn_id
self.dataset_reference = dataset_reference if dataset_reference else {}
self.delegate_to = delegate_to
self.log.info('Dataset id: %s', self.dataset_id)
self.log.info('Project id: %s', self.project_id)
super(BigQueryCreateEmptyDatasetOperator, self).__init__(*args, **kwargs)
def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
conn = bq_hook.get_conn()
cursor = conn.cursor()
cursor.create_empty_dataset(
project_id=self.project_id,
dataset_id=self.dataset_id,
dataset_reference=self.dataset_reference)

Просмотреть файл

@ -120,6 +120,7 @@ Operators
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyDatasetOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
.. autoclass:: airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator
.. autoclass:: airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator

Просмотреть файл

@ -350,6 +350,7 @@ BigQuery Operators
- :ref:`BigQueryCreateEmptyTableOperator` : Creates a new, empty table in the specified BigQuery dataset optionally with schema.
- :ref:`BigQueryCreateExternalTableOperator` : Creates a new, external table in the dataset with the data in Google Cloud Storage.
- :ref:`BigQueryDeleteDatasetOperator` : Deletes an existing BigQuery dataset.
- :ref:`BigQueryCreateEmptyDatasetOperator` : Creates an empty BigQuery dataset.
- :ref:`BigQueryOperator` : Executes BigQuery SQL queries in a specific BigQuery database.
- :ref:`BigQueryToBigQueryOperator` : Copy a BigQuery table to another BigQuery table.
- :ref:`BigQueryToCloudStorageOperator` : Transfers a BigQuery table to a Google Cloud Storage bucket
@ -404,6 +405,13 @@ BigQueryDeleteDatasetOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator
.. _BigQueryCreateEmptyDatasetOperator:
BigQueryCreateEmptyDatasetOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyDatasetOperator
.. _BigQueryOperator:
BigQueryOperator

Просмотреть файл

@ -337,8 +337,31 @@ class TestLabelsInRunJob(unittest.TestCase):
mocked_rwc.assert_called_once()
class TestTimePartitioningInRunJob(unittest.TestCase):
class TestDatasetsOperations(unittest.TestCase):
@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
def test_create_empty_dataset_no_dataset_id_err(self,
run_with_configuration):
with self.assertRaises(ValueError):
hook.BigQueryBaseCursor(
mock.Mock(), "test_create_empty_dataset").create_empty_dataset(
dataset_id="", project_id="")
@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
def test_create_empty_dataset_duplicates_call_err(self,
run_with_configuration):
with self.assertRaises(ValueError):
hook.BigQueryBaseCursor(
mock.Mock(), "test_create_empty_dataset").create_empty_dataset(
dataset_id="", project_id="project_test",
dataset_reference={
"datasetReference":
{"datasetId": "test_dataset",
"projectId": "project_test2"}})
class TestTimePartitioningInRunJob(unittest.TestCase):
@mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")
@mock.patch("airflow.contrib.hooks.bigquery_hook.time")
@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')

Просмотреть файл

@ -22,8 +22,8 @@ import warnings
from airflow.contrib.operators.bigquery_operator import \
BigQueryCreateExternalTableOperator, \
BigQueryOperator, \
BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator
BigQueryOperator, BigQueryCreateEmptyTableOperator, \
BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator
try:
from unittest import mock
@ -136,3 +136,24 @@ class BigQueryDeleteDatasetOperatorTest(unittest.TestCase):
dataset_id=TEST_DATASET,
project_id=TEST_PROJECT_ID
)
class BigQueryCreateEmptyDatasetOperatorTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
def test_execute(self, mock_hook):
operator = BigQueryCreateEmptyDatasetOperator(
task_id=TASK_ID,
dataset_id=TEST_DATASET,
project_id=TEST_PROJECT_ID
)
operator.execute(None)
mock_hook.return_value \
.get_conn() \
.cursor() \
.create_empty_dataset \
.assert_called_once_with(
dataset_id=TEST_DATASET,
project_id=TEST_PROJECT_ID,
dataset_reference={}
)