[AIRFLOW-5318] Option to specify location of the new BQ dataset (#5923)

This commit is contained in:
Mohannad Albanayosy 2019-09-06 12:41:40 +02:00 коммит произвёл Jarek Potiuk
Родитель 8151b7ca7f
Коммит 6b82b9ef91
4 изменённых файлов: 76 добавлений и 4 удалений

Просмотреть файл

@ -1601,7 +1601,7 @@ class BigQueryBaseCursor(LoggingMixin):
return source_dataset_resource
def create_empty_dataset(self, dataset_id="", project_id="",
dataset_reference=None):
location=None, dataset_reference=None):
"""
Create a new empty dataset:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
@ -1612,6 +1612,9 @@ class BigQueryBaseCursor(LoggingMixin):
:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:type dataset_id: str
:param location: (Optional) The geographic location where the dataset should reside.
There is no default value but the dataset will be created in US if nothing is provided.
:type location: str
:param dataset_reference: Dataset reference that could be provided
with request body. More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
@ -1648,6 +1651,14 @@ class BigQueryBaseCursor(LoggingMixin):
param_name, param,
dataset_reference['datasetReference'], 'dataset_reference')
if location:
if 'location' not in dataset_reference:
dataset_reference.update({'location': location})
else:
_api_resource_configs_duplication_check(
'location', location,
dataset_reference, 'dataset_reference')
dataset_id = dataset_reference.get("datasetReference").get("datasetId")
dataset_project_id = dataset_reference.get("datasetReference").get(
"projectId")

Просмотреть файл

@ -725,6 +725,9 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
:param dataset_id: The id of dataset. Don't need to provide,
if datasetId in dataset_reference.
:type dataset_id: str
:param location: (Optional) The geographic location where the dataset should reside.
There is no default value but the dataset will be created in US if nothing is provided.
:type location: str
:param dataset_reference: Dataset reference that could be provided with request body.
More info:
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
@ -754,6 +757,7 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
dataset_id,
project_id=None,
dataset_reference=None,
location=None,
gcp_conn_id='google_cloud_default',
bigquery_conn_id=None,
delegate_to=None,
@ -767,6 +771,7 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
self.dataset_id = dataset_id
self.project_id = project_id
self.location = location
self.gcp_conn_id = gcp_conn_id
self.dataset_reference = dataset_reference if dataset_reference else {}
self.delegate_to = delegate_to
@ -786,7 +791,8 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
cursor.create_empty_dataset(
project_id=self.project_id,
dataset_id=self.dataset_id,
dataset_reference=self.dataset_reference)
dataset_reference=self.dataset_reference,
location=self.location)
class BigQueryGetDatasetOperator(BaseOperator):

Просмотреть файл

@ -688,7 +688,6 @@ class TestLabelsInRunJob(unittest.TestCase):
class TestDatasetsOperations(unittest.TestCase):
def test_create_empty_dataset_no_dataset_id_err(self):
with self.assertRaises(ValueError):
hook.BigQueryBaseCursor(
mock.Mock(), "test_create_empty_dataset").create_empty_dataset(
@ -704,6 +703,59 @@ class TestDatasetsOperations(unittest.TestCase):
{"datasetId": "test_dataset",
"projectId": "project_test2"}})
def test_create_empty_dataset_with_location_duplicates_call_err(self):
with self.assertRaises(ValueError):
hook.BigQueryBaseCursor(
mock.Mock(), "test_create_empty_dataset").create_empty_dataset(
dataset_id="", project_id="project_test", location="EU",
dataset_reference={
"location": "US",
"datasetReference":
{"datasetId": "test_dataset",
"projectId": "project_test"}})
def test_create_empty_dataset_with_location(self):
project_id = 'bq-project'
dataset_id = 'bq_dataset'
location = 'EU'
mock_service = mock.Mock()
method = mock_service.datasets.return_value.insert
cursor = hook.BigQueryBaseCursor(mock_service, project_id)
cursor.create_empty_dataset(project_id=project_id, dataset_id=dataset_id, location=location)
expected_body = {
"location": "EU",
"datasetReference": {
"datasetId": "bq_dataset",
"projectId": "bq-project"
}
}
method.assert_called_once_with(projectId=project_id, body=expected_body)
def test_create_empty_dataset_with_location_duplicates_call_no_err(self):
project_id = 'bq-project'
dataset_id = 'bq_dataset'
location = 'EU'
dataset_reference = {"location": "EU"}
mock_service = mock.Mock()
method = mock_service.datasets.return_value.insert
cursor = hook.BigQueryBaseCursor(mock_service, project_id)
cursor.create_empty_dataset(project_id=project_id, dataset_id=dataset_id, location=location,
dataset_reference=dataset_reference)
expected_body = {
"location": "EU",
"datasetReference": {
"datasetId": "bq_dataset",
"projectId": "bq-project"
}
}
method.assert_called_once_with(projectId=project_id, body=expected_body)
def test_get_dataset_without_dataset_id(self):
with mock.patch.object(hook.BigQueryHook, 'get_service'):
with self.assertRaises(ValueError):

Просмотреть файл

@ -42,6 +42,7 @@ from tests.compat import mock
TASK_ID = 'test-bq-create-table-operator'
TEST_DATASET = 'test-dataset'
TEST_DATASET_LOCATION = 'EU'
TEST_GCP_PROJECT_ID = 'test-project'
TEST_DELETE_CONTENTS = True
TEST_TABLE_ID = 'test-table-id'
@ -146,7 +147,8 @@ class TestBigQueryCreateEmptyDatasetOperator(unittest.TestCase):
operator = BigQueryCreateEmptyDatasetOperator(
task_id=TASK_ID,
dataset_id=TEST_DATASET,
project_id=TEST_GCP_PROJECT_ID
project_id=TEST_GCP_PROJECT_ID,
location=TEST_DATASET_LOCATION
)
operator.execute(None)
@ -157,6 +159,7 @@ class TestBigQueryCreateEmptyDatasetOperator(unittest.TestCase):
.assert_called_once_with(
dataset_id=TEST_DATASET,
project_id=TEST_GCP_PROJECT_ID,
location=TEST_DATASET_LOCATION,
dataset_reference={}
)