[AIRFLOW-6485] BigQuery hook - add missing test for BIgQueryBaseCursor methods (#7077)
This commit is contained in:
Родитель
e863f8a08c
Коммит
6cbedf8775
|
@ -850,6 +850,155 @@ class TestBigQueryBaseCursor(unittest.TestCase):
|
|||
|
||||
self.assertEqual(table_list, result)
|
||||
|
||||
@parameterized.expand([
|
||||
("US", None, True),
|
||||
(None, None, True),
|
||||
(
|
||||
None,
|
||||
HttpError(resp=type('', (object,), {"status": 500, })(), content=b'Internal Server Error'),
|
||||
False
|
||||
),
|
||||
(
|
||||
None,
|
||||
HttpError(resp=type('', (object,), {"status": 503, })(), content=b'Service Unavailable'),
|
||||
False
|
||||
),
|
||||
])
|
||||
@mock.patch(
|
||||
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
|
||||
return_value=(CREDENTIALS, PROJECT_ID)
|
||||
)
|
||||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
|
||||
def test_poll_job_complete_pass(
|
||||
self, location, exception, expected_result, mock_get_service, mock_get_creds_and_proj_id
|
||||
):
|
||||
method_jobs = mock_get_service.return_value.jobs
|
||||
method_get = method_jobs.return_value.get
|
||||
method_execute = method_get.return_value.execute
|
||||
method_execute.return_value = {"status": {"state": "DONE"}}
|
||||
method_execute.side_effect = exception
|
||||
|
||||
hook_params = {"location": location} if location else {}
|
||||
bq_hook = hook.BigQueryHook(**hook_params)
|
||||
cursor = bq_hook.get_cursor()
|
||||
|
||||
result = cursor.poll_job_complete(JOB_ID)
|
||||
self.assertEqual(expected_result, result)
|
||||
method_get.assert_called_once_with(projectId=PROJECT_ID, jobId=JOB_ID, **hook_params)
|
||||
assert method_jobs.call_count == 1
|
||||
assert method_get.call_count == 1
|
||||
assert method_execute.call_count == 1
|
||||
|
||||
@mock.patch(
|
||||
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
|
||||
return_value=(CREDENTIALS, PROJECT_ID)
|
||||
)
|
||||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
|
||||
def test_pull_job_complete_on_fails(self, mock_get_service, mock_get_creds_and_proj_id):
|
||||
method_jobs = mock_get_service.return_value.jobs
|
||||
method_get = method_jobs.return_value.get
|
||||
method_execute = method_get.return_value.execute
|
||||
resp = type('', (object,), {"status": 404, "reason": "Not Found"})()
|
||||
method_execute.side_effect = HttpError(resp=resp, content=b'Not Found')
|
||||
|
||||
bq_hook = hook.BigQueryHook()
|
||||
cursor = bq_hook.get_cursor()
|
||||
with self.assertRaisesRegex(AirflowException, "HttpError 404 \"Not Found\""):
|
||||
cursor.poll_job_complete(JOB_ID)
|
||||
|
||||
@mock.patch(
|
||||
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
|
||||
return_value=(CREDENTIALS, PROJECT_ID)
|
||||
)
|
||||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
|
||||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryBaseCursor.poll_job_complete")
|
||||
@mock.patch("logging.Logger.info")
|
||||
def test_cancel_query_np_jobs_to_cancel(
|
||||
self, mock_logger_info, poll_job_complete, mock_get_service, mock_get_creds_and_proj_id
|
||||
):
|
||||
method_jobs = mock_get_service.return_value.jobs
|
||||
poll_job_complete.return_value = True
|
||||
|
||||
bq_hook = hook.BigQueryHook()
|
||||
cursor = bq_hook.get_cursor()
|
||||
cursor.running_job_id = JOB_ID
|
||||
cursor.cancel_query()
|
||||
assert method_jobs.call_count == 1
|
||||
assert poll_job_complete.call_count == 1
|
||||
mock_logger_info.has_call(mock.call("No running BigQuery jobs to cancel."))
|
||||
|
||||
@mock.patch(
|
||||
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
|
||||
return_value=(CREDENTIALS, PROJECT_ID)
|
||||
)
|
||||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
|
||||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryBaseCursor.poll_job_complete")
|
||||
@mock.patch("time.sleep")
|
||||
@mock.patch("logging.Logger.info")
|
||||
def test_cancel_query_np_cancel_timeout(
|
||||
self, mock_logger_info, mock_sleep, poll_job_complete, mock_get_service, mock_get_creds_and_proj_id
|
||||
):
|
||||
method_jobs = mock_get_service.return_value.jobs
|
||||
method_jobs_cancel = method_jobs.return_value.cancel
|
||||
poll_job_complete.side_effect = [False] * 13
|
||||
|
||||
bq_hook = hook.BigQueryHook()
|
||||
cursor = bq_hook.get_cursor()
|
||||
cursor.running_job_id = JOB_ID
|
||||
cursor.cancel_query()
|
||||
assert method_jobs.call_count == 1
|
||||
assert method_jobs_cancel.call_count == 1
|
||||
assert poll_job_complete.call_count == 13
|
||||
assert mock_sleep.call_count == 11
|
||||
mock_logger_info.has_call(
|
||||
mock.call("Stopping polling due to timeout. Job with id {} "
|
||||
"has not completed cancel and may or may not finish.".format(JOB_ID))
|
||||
)
|
||||
|
||||
@mock.patch(
|
||||
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
|
||||
return_value=(CREDENTIALS, PROJECT_ID)
|
||||
)
|
||||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
|
||||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryBaseCursor.poll_job_complete")
|
||||
@mock.patch("time.sleep")
|
||||
@mock.patch("logging.Logger.info")
|
||||
def test_cancel_query_np_cancel_completed(
|
||||
self, mock_logger_info, mock_sleep, poll_job_complete, mock_get_service, mock_get_creds_and_proj_id
|
||||
):
|
||||
method_jobs = mock_get_service.return_value.jobs
|
||||
method_jobs_cancel = method_jobs.return_value.cancel
|
||||
poll_job_complete.side_effect = [False] * 12 + [True]
|
||||
|
||||
bq_hook = hook.BigQueryHook()
|
||||
cursor = bq_hook.get_cursor()
|
||||
cursor.running_job_id = JOB_ID
|
||||
cursor.cancel_query()
|
||||
assert method_jobs.call_count == 1
|
||||
assert method_jobs_cancel.call_count == 1
|
||||
assert poll_job_complete.call_count == 13
|
||||
assert mock_sleep.call_count == 11
|
||||
mock_logger_info.has_call(mock.call("Job successfully canceled: {}, {}".format(PROJECT_ID, JOB_ID)))
|
||||
|
||||
@mock.patch(
|
||||
'airflow.gcp.hooks.base.CloudBaseHook._get_credentials_and_project_id',
|
||||
return_value=(CREDENTIALS, PROJECT_ID)
|
||||
)
|
||||
@mock.patch("airflow.gcp.hooks.bigquery.BigQueryHook.get_service")
|
||||
def test_get_schema(self, mock_get_service, mock_get_creds_and_proj_id):
|
||||
schema = "SCHEMA"
|
||||
method_get = mock_get_service.return_value.tables.return_value.get
|
||||
method_execute = method_get.return_value.execute
|
||||
method_execute.return_value = {"schema": schema}
|
||||
|
||||
bq_hook = hook.BigQueryHook()
|
||||
cursor = bq_hook.get_cursor()
|
||||
result = cursor.get_schema(dataset_id=DATASET_ID, table_id=TABLE_ID)
|
||||
|
||||
method_get.assert_called_once_with(projectId=PROJECT_ID, datasetId=DATASET_ID, tableId=TABLE_ID)
|
||||
assert method_execute.call_count == 1
|
||||
self.assertEqual(schema, result)
|
||||
|
||||
|
||||
class TestTableDataOperations(unittest.TestCase):
|
||||
@mock.patch(
|
||||
|
|
Загрузка…
Ссылка в новой задаче