[AIRFLOW-6442] BigQuery hook - standardize handling http exceptions (#7028)
This commit is contained in:
Родитель
20299473f1
Коммит
848022abdd
|
@ -57,6 +57,15 @@ https://developers.google.com/style/inclusive-documentation
|
|||
|
||||
-->
|
||||
|
||||
### Standardize handling http exception in BigQuery
|
||||
|
||||
Since BigQuery is the part of the GCP it was possible to simplify the code by handling the exceptions
|
||||
by usage of the `airflow.gcp.hooks.base.CloudBaseHook.catch_http_exception` decorator however it changes
|
||||
exceptions raised by the following methods:
|
||||
* `airflow.gcp.hooks.bigquery.BigQueryBaseCursor.run_table_delete` raises `AirflowException` instead of `Exception`.
|
||||
* `airflow.gcp.hooks.bigquery.BigQueryBaseCursor.create_empty_dataset` raises `AirflowException` instead of `ValueError`.
|
||||
* `airflow.gcp.hooks.bigquery.BigQueryBaseCursor.get_dataset` raises `AirflowException` instead of `ValueError`.
|
||||
|
||||
### Remove airflow.utils.file.TemporaryDirectory
|
||||
|
||||
Since Airflow dropped support for Python < 3.5 there's no need to have this custom
|
||||
|
|
|
@ -231,6 +231,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
self.num_retries = num_retries
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def create_empty_table(self,
|
||||
project_id: str,
|
||||
dataset_id: str,
|
||||
|
@ -334,6 +335,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
datasetId=dataset_id,
|
||||
body=table_resource).execute(num_retries=num_retries)
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def create_external_table(self, # pylint: disable=too-many-locals,too-many-arguments
|
||||
external_project_dataset_table: str,
|
||||
schema_fields: List,
|
||||
|
@ -537,6 +539,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
self.log.info('External table created successfully: %s',
|
||||
external_project_dataset_table)
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def patch_table(self, # pylint: disable=too-many-arguments
|
||||
dataset_id: str,
|
||||
table_id: str,
|
||||
|
@ -640,20 +643,14 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
self.log.info('Patching Table %s:%s.%s',
|
||||
project_id, dataset_id, table_id)
|
||||
|
||||
try:
|
||||
self.service.tables().patch(
|
||||
projectId=project_id,
|
||||
datasetId=dataset_id,
|
||||
tableId=table_id,
|
||||
body=table_resource).execute(num_retries=self.num_retries)
|
||||
self.service.tables().patch(
|
||||
projectId=project_id,
|
||||
datasetId=dataset_id,
|
||||
tableId=table_id,
|
||||
body=table_resource).execute(num_retries=self.num_retries)
|
||||
|
||||
self.log.info('Table patched successfully: %s:%s.%s',
|
||||
project_id, dataset_id, table_id)
|
||||
|
||||
except HttpError as err:
|
||||
raise AirflowException(
|
||||
'BigQuery job failed. Error was: {}'.format(err.content)
|
||||
)
|
||||
self.log.info('Table patched successfully: %s:%s.%s',
|
||||
project_id, dataset_id, table_id)
|
||||
|
||||
# pylint: disable=too-many-locals,too-many-arguments, too-many-branches
|
||||
def run_query(self,
|
||||
|
@ -1421,6 +1418,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
self.running_job_id)
|
||||
time.sleep(5)
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def get_dataset_tables(self, dataset_id: str, project_id: Optional[str] = None,
|
||||
max_results: Optional[int] = None,
|
||||
page_token: Optional[str] = None) -> Dict[str, Union[str, int, List]]:
|
||||
|
@ -1454,6 +1452,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
datasetId=dataset_id,
|
||||
**optional_params).execute(num_retries=self.num_retries))
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def get_schema(self, dataset_id: str, table_id: str) -> Dict:
|
||||
"""
|
||||
Get the schema for a given datset.table.
|
||||
|
@ -1468,6 +1467,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
.execute(num_retries=self.num_retries)
|
||||
return tables_resource['schema']
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def get_tabledata(self, dataset_id: str, table_id: str,
|
||||
max_results: Optional[int] = None, selected_fields: Optional[str] = None,
|
||||
page_token: Optional[str] = None, start_index: Optional[int] = None) -> Dict:
|
||||
|
@ -1502,6 +1502,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
tableId=table_id,
|
||||
**optional_params).execute(num_retries=self.num_retries))
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def run_table_delete(self, deletion_dataset_table: str,
|
||||
ignore_if_missing: bool = False) -> None:
|
||||
"""
|
||||
|
@ -1530,12 +1531,13 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
.execute(num_retries=self.num_retries)
|
||||
self.log.info('Deleted table %s:%s.%s.', deletion_project,
|
||||
deletion_dataset, deletion_table)
|
||||
except HttpError:
|
||||
if not ignore_if_missing:
|
||||
raise Exception('Table deletion failed. Table does not exist.')
|
||||
else:
|
||||
except HttpError as e:
|
||||
if e.resp.status == 404 and ignore_if_missing:
|
||||
self.log.info('Table does not exist. Skipping.')
|
||||
else:
|
||||
raise e
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def run_table_upsert(self, dataset_id: str, table_resource: Dict,
|
||||
project_id: Optional[str] = None) -> Dict:
|
||||
"""
|
||||
|
@ -1586,6 +1588,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
datasetId=dataset_id,
|
||||
body=table_resource).execute(num_retries=self.num_retries)
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def run_grant_dataset_view_access(self,
|
||||
source_dataset: str,
|
||||
view_dataset: str,
|
||||
|
@ -1649,6 +1652,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
view_project, view_dataset, view_table, source_project, source_dataset)
|
||||
return source_dataset_resource
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def create_empty_dataset(self,
|
||||
dataset_id: str = "",
|
||||
project_id: str = "",
|
||||
|
@ -1722,6 +1726,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
projectId=dataset_project_id,
|
||||
body=dataset_reference).execute(num_retries=self.num_retries)
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def delete_dataset(self, project_id: str, dataset_id: str, delete_contents: bool = False) -> None:
|
||||
"""
|
||||
Delete a dataset of Big query in your project.
|
||||
|
@ -1754,6 +1759,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
'BigQuery job failed. Error was: {}'.format(err.content)
|
||||
)
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def get_dataset(self, dataset_id: str, project_id: Optional[str] = None) -> Dict:
|
||||
"""
|
||||
Method returns dataset_resource if dataset exist
|
||||
|
@ -1776,16 +1782,13 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
|
||||
dataset_project_id = project_id if project_id else self.project_id
|
||||
|
||||
try:
|
||||
dataset_resource = self.service.datasets().get(
|
||||
datasetId=dataset_id, projectId=dataset_project_id).execute(num_retries=self.num_retries)
|
||||
self.log.info("Dataset Resource: %s", dataset_resource)
|
||||
except HttpError as err:
|
||||
raise AirflowException(
|
||||
'BigQuery job failed. Error was: {}'.format(err.content))
|
||||
dataset_resource = self.service.datasets().get(
|
||||
datasetId=dataset_id, projectId=dataset_project_id).execute(num_retries=self.num_retries)
|
||||
self.log.info("Dataset Resource: %s", dataset_resource)
|
||||
|
||||
return dataset_resource
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def get_datasets_list(self, project_id: Optional[str] = None) -> List:
|
||||
"""
|
||||
Method returns full list of BigQuery datasets in the current project
|
||||
|
@ -1823,14 +1826,9 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
"""
|
||||
dataset_project_id = project_id if project_id else self.project_id
|
||||
|
||||
try:
|
||||
datasets_list = self.service.datasets().list(
|
||||
projectId=dataset_project_id).execute(num_retries=self.num_retries)['datasets']
|
||||
self.log.info("Datasets List: %s", datasets_list)
|
||||
|
||||
except HttpError as err:
|
||||
raise AirflowException(
|
||||
'BigQuery job failed. Error was: {}'.format(err.content))
|
||||
datasets_list = self.service.datasets().list(
|
||||
projectId=dataset_project_id).execute(num_retries=self.num_retries)['datasets']
|
||||
self.log.info("Datasets List: %s", datasets_list)
|
||||
|
||||
return datasets_list
|
||||
|
||||
|
@ -1897,6 +1895,7 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
|
||||
return dataset_tables_list
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def patch_dataset(self, dataset_id: str, dataset_resource: str, project_id: Optional[str] = None) -> Dict:
|
||||
"""
|
||||
Patches information in an existing dataset.
|
||||
|
@ -1924,24 +1923,20 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
|
||||
dataset_project_id = project_id if project_id else self.project_id
|
||||
|
||||
try:
|
||||
dataset = (
|
||||
self.service.datasets()
|
||||
.patch(
|
||||
datasetId=dataset_id,
|
||||
projectId=dataset_project_id,
|
||||
body=dataset_resource,
|
||||
)
|
||||
.execute(num_retries=self.num_retries)
|
||||
)
|
||||
self.log.info("Dataset successfully patched: %s", dataset)
|
||||
except HttpError as err:
|
||||
raise AirflowException(
|
||||
"BigQuery job failed. Error was: {}".format(err.content)
|
||||
dataset = (
|
||||
self.service.datasets()
|
||||
.patch(
|
||||
datasetId=dataset_id,
|
||||
projectId=dataset_project_id,
|
||||
body=dataset_resource,
|
||||
)
|
||||
.execute(num_retries=self.num_retries)
|
||||
)
|
||||
self.log.info("Dataset successfully patched: %s", dataset)
|
||||
|
||||
return dataset
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def update_dataset(self, dataset_id: str,
|
||||
dataset_resource: Dict, project_id: Optional[str] = None) -> Dict:
|
||||
"""
|
||||
|
@ -1971,24 +1966,20 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
|
||||
dataset_project_id = project_id if project_id else self.project_id
|
||||
|
||||
try:
|
||||
dataset = (
|
||||
self.service.datasets()
|
||||
.update(
|
||||
datasetId=dataset_id,
|
||||
projectId=dataset_project_id,
|
||||
body=dataset_resource,
|
||||
)
|
||||
.execute(num_retries=self.num_retries)
|
||||
)
|
||||
self.log.info("Dataset successfully updated: %s", dataset)
|
||||
except HttpError as err:
|
||||
raise AirflowException(
|
||||
"BigQuery job failed. Error was: {}".format(err.content)
|
||||
dataset = (
|
||||
self.service.datasets()
|
||||
.update(
|
||||
datasetId=dataset_id,
|
||||
projectId=dataset_project_id,
|
||||
body=dataset_resource,
|
||||
)
|
||||
.execute(num_retries=self.num_retries)
|
||||
)
|
||||
self.log.info("Dataset successfully updated: %s", dataset)
|
||||
|
||||
return dataset
|
||||
|
||||
@CloudBaseHook.catch_http_exception
|
||||
def insert_all(self, project_id: str, dataset_id: str, table_id: str,
|
||||
rows: List, ignore_unknown_values: bool = False,
|
||||
skip_invalid_rows: bool = False, fail_on_error: bool = False) -> None:
|
||||
|
@ -2035,35 +2026,30 @@ class BigQueryBaseCursor(LoggingMixin):
|
|||
"skipInvalidRows": skip_invalid_rows,
|
||||
}
|
||||
|
||||
try:
|
||||
self.log.info(
|
||||
'Inserting %s row(s) into Table %s:%s.%s',
|
||||
len(rows), dataset_project_id, dataset_id, table_id
|
||||
)
|
||||
|
||||
resp = self.service.tabledata().insertAll(
|
||||
projectId=dataset_project_id, datasetId=dataset_id,
|
||||
tableId=table_id, body=body
|
||||
).execute(num_retries=self.num_retries)
|
||||
|
||||
if 'insertErrors' not in resp:
|
||||
self.log.info(
|
||||
'Inserting %s row(s) into Table %s:%s.%s',
|
||||
len(rows), dataset_project_id, dataset_id, table_id
|
||||
'All row(s) inserted successfully: %s:%s.%s',
|
||||
dataset_project_id, dataset_id, table_id
|
||||
)
|
||||
|
||||
resp = self.service.tabledata().insertAll(
|
||||
projectId=dataset_project_id, datasetId=dataset_id,
|
||||
tableId=table_id, body=body
|
||||
).execute(num_retries=self.num_retries)
|
||||
|
||||
if 'insertErrors' not in resp:
|
||||
self.log.info(
|
||||
'All row(s) inserted successfully: %s:%s.%s',
|
||||
dataset_project_id, dataset_id, table_id
|
||||
else:
|
||||
error_msg = '{} insert error(s) occurred: {}:{}.{}. Details: {}'.format(
|
||||
len(resp['insertErrors']),
|
||||
dataset_project_id, dataset_id, table_id, resp['insertErrors'])
|
||||
if fail_on_error:
|
||||
raise AirflowException(
|
||||
'BigQuery job failed. Error was: {}'.format(error_msg)
|
||||
)
|
||||
else:
|
||||
error_msg = '{} insert error(s) occurred: {}:{}.{}. Details: {}'.format(
|
||||
len(resp['insertErrors']),
|
||||
dataset_project_id, dataset_id, table_id, resp['insertErrors'])
|
||||
if fail_on_error:
|
||||
raise AirflowException(
|
||||
'BigQuery job failed. Error was: {}'.format(error_msg)
|
||||
)
|
||||
self.log.info(error_msg)
|
||||
except HttpError as err:
|
||||
raise AirflowException(
|
||||
'BigQuery job failed. Error was: {}'.format(err.content)
|
||||
)
|
||||
self.log.info(error_msg)
|
||||
|
||||
|
||||
class BigQueryCursor(BigQueryBaseCursor):
|
||||
|
|
|
@ -659,12 +659,12 @@ class TestBigQueryBaseCursor(unittest.TestCase):
|
|||
def test_run_table_delete_ignore_if_missing_fails(self, mock_get_service, mock_project_id):
|
||||
source_project_dataset_table = "{}.{}.{}".format(PROJECT_ID, DATASET_ID, TABLE_ID)
|
||||
method = mock_get_service.return_value.tables.return_value.delete
|
||||
resp = type('', (object,), {"status": 404, })()
|
||||
resp = type('', (object,), {"status": 404, "reason": "Address not found"})()
|
||||
method.return_value.execute.side_effect = HttpError(
|
||||
resp=resp, content=b'Address not found')
|
||||
bq_hook = hook.BigQueryHook()
|
||||
cursor = bq_hook.get_cursor()
|
||||
with self.assertRaisesRegex(Exception, r"Table deletion failed\. Table does not exist\."):
|
||||
with self.assertRaisesRegex(AirflowException, r"Address not found"):
|
||||
cursor.run_table_delete(source_project_dataset_table)
|
||||
method.assert_called_once_with(datasetId=DATASET_ID, projectId=PROJECT_ID, tableId=TABLE_ID)
|
||||
|
||||
|
@ -676,7 +676,7 @@ class TestBigQueryBaseCursor(unittest.TestCase):
|
|||
def test_run_table_delete_ignore_if_missing_pass(self, mock_get_service, mock_project_id):
|
||||
source_project_dataset_table = "{}.{}.{}".format(PROJECT_ID, DATASET_ID, TABLE_ID)
|
||||
method = mock_get_service.return_value.tables.return_value.delete
|
||||
resp = type('', (object,), {"status": 404, })()
|
||||
resp = type('', (object,), {"status": 404, "reason": "Address not found"})()
|
||||
method.return_value.execute.side_effect = HttpError(
|
||||
resp=resp, content=b'Address not found')
|
||||
bq_hook = hook.BigQueryHook()
|
||||
|
@ -1416,8 +1416,11 @@ class TestDatasetsOperations(unittest.TestCase):
|
|||
)
|
||||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
|
||||
def test_create_empty_dataset_no_dataset_id_err(self, mock_get_service, mock_get_creds_and_proj_id):
|
||||
with self.assertRaisesRegex(ValueError, r"dataset_id not provided and datasetId not exist in the "
|
||||
r"datasetReference\. Impossible to create dataset"):
|
||||
with self.assertRaisesRegex(
|
||||
AirflowException,
|
||||
r"dataset_id not provided and datasetId not exist in the "
|
||||
r"datasetReference\. Impossible to create dataset"
|
||||
):
|
||||
bq_hook = hook.BigQueryHook()
|
||||
cursor = bq_hook.get_cursor()
|
||||
cursor.create_empty_dataset(dataset_id="", project_id="")
|
||||
|
@ -1429,7 +1432,7 @@ class TestDatasetsOperations(unittest.TestCase):
|
|||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
|
||||
def test_create_empty_dataset_duplicates_call_err(self, mock_get_service, mock_get_creds_and_proj_id):
|
||||
with self.assertRaisesRegex(
|
||||
ValueError,
|
||||
AirflowException,
|
||||
r"Values of projectId param are duplicated\. dataset_reference contained projectId param in "
|
||||
r"`query` config and projectId was also provided with arg to run_query\(\) method\. "
|
||||
r"Please remove duplicates\."
|
||||
|
@ -1452,7 +1455,7 @@ class TestDatasetsOperations(unittest.TestCase):
|
|||
self, mock_get_service, mock_get_creds_and_proj_id
|
||||
):
|
||||
with self.assertRaisesRegex(
|
||||
ValueError,
|
||||
AirflowException,
|
||||
r"Values of location param are duplicated\. dataset_reference contained location param in "
|
||||
r"`query` config and location was also provided with arg to run_query\(\) method\. "
|
||||
r"Please remove duplicates\."
|
||||
|
@ -1524,7 +1527,7 @@ class TestDatasetsOperations(unittest.TestCase):
|
|||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
|
||||
def test_get_dataset_without_dataset_id(self, mock_get_service, mock_get_creds_and_proj_id):
|
||||
with self.assertRaisesRegex(
|
||||
ValueError,
|
||||
AirflowException,
|
||||
r"ataset_id argument must be provided and has a type 'str'\. You provided: "
|
||||
):
|
||||
bq_hook = hook.BigQueryHook()
|
||||
|
|
Загрузка…
Ссылка в новой задаче