bigquery-etl/tests/test_deploy.py

145 строки
5.0 KiB
Python

from unittest.mock import patch
import pytest
import yaml
from google.cloud.exceptions import NotFound
from bigquery_etl import deploy
class TestDeploy:
@pytest.fixture(scope="session")
def query_path(self, tmp_path_factory):
path = (
tmp_path_factory.mktemp("sql")
/ "moz-fx-data-shared-prod"
/ "test"
/ "test_query_v1"
)
path.mkdir(parents=True)
(path / "query.sql").write_text("SELECT 1")
(path / "schema.yaml").write_text(
yaml.dump({"fields": [{"name": "f0_", "type": "INTEGER"}]})
)
return path
@patch("google.cloud.bigquery.Client")
@patch("bigquery_etl.dryrun.DryRun")
def test_deploy_new_table(self, mock_dryrun, mock_client, query_path):
mock_dryrun().get_schema.return_value = {
"fields": [{"name": "f0_", "type": "INTEGER"}]
}
client = mock_client.return_value
client.get_table.side_effect = NotFound("table not found")
deploy.deploy_table(
query_file=query_path / "query.sql",
)
client.create_table.assert_called_once()
assert client.update_table.call_count == 0
@patch("google.cloud.bigquery.Client")
@patch("bigquery_etl.dryrun.DryRun")
def test_deploy_new_table_with_missing_dataset(
self, mock_dryrun, mock_client, query_path
):
mock_dryrun().get_schema.return_value = {
"fields": [{"name": "f0_", "type": "INTEGER"}]
}
client = mock_client.return_value
client.get_table.side_effect = NotFound("table not found")
client.create_table.side_effect = NotFound(
"404 POST https://bigquery.googleapis.com/..."
)
with pytest.raises(
deploy.FailedDeployException, match="Unable to create table"
):
deploy.deploy_table(query_file=query_path / "query.sql")
def test_deploy_table_without_schema_raises_skip(self, tmp_path):
query_path = (
tmp_path / "sql" / "moz-fx-data-shared-prod" / "test" / "test_query_v1"
)
query_path.mkdir(parents=True)
(query_path / "query.sql").write_text("SELECT 1")
with pytest.raises(deploy.SkippedDeployException, match="Schema missing"):
deploy.deploy_table(query_file=query_path / "query.sql")
def test_deploy_with_null_destination_raises_skip(self, tmp_path):
query_path = (
tmp_path / "sql" / "moz-fx-data-shared-prod" / "test" / "test_query_v1"
)
query_path.mkdir(parents=True)
(query_path / "query.sql").write_text("SELECT 1")
(query_path / "schema.yaml").write_text(
yaml.dump({"fields": [{"name": "f0_", "type": "INTEGER"}]})
)
(query_path / "metadata.yaml").write_text(
yaml.dump({"scheduling": {"destination_table": None}})
)
with pytest.raises(
deploy.SkippedDeployException, match="null destination_table configured"
):
deploy.deploy_table(query_file=query_path / "query.sql")
@patch("google.cloud.bigquery.Client")
@patch("bigquery_etl.dryrun.DryRun")
def test_deploy_table_already_exists(self, mock_dryrun, mock_client, query_path):
mock_dryrun().get_schema.return_value = {
"fields": [{"name": "f0_", "type": "INTEGER"}]
}
client = mock_client.return_value
deploy.deploy_table(
query_file=query_path / "query.sql",
)
client.update_table.assert_called_once()
assert client.create_table.call_count == 0
@patch("google.cloud.bigquery.Client")
@patch("bigquery_etl.dryrun.DryRun")
def test_deploy_table_already_exists_skip_existing(
self, mock_dryrun, mock_client, query_path
):
mock_dryrun().get_schema.return_value = {
"fields": [{"name": "f0_", "type": "INTEGER"}]
}
with pytest.raises(deploy.SkippedDeployException, match="already exists"):
deploy.deploy_table(query_file=query_path / "query.sql", skip_existing=True)
@patch("bigquery_etl.dryrun.DryRun")
def test_deploy_fails_when_schemas_dont_match(self, mock_dryrun, query_path):
mock_dryrun().get_schema.return_value = {
"fields": [{"name": "doesnt_exist", "type": "STRING"}]
}
with pytest.raises(deploy.FailedDeployException, match="does not match schema"):
deploy.deploy_table(
query_file=query_path / "query.sql",
)
@patch("google.cloud.bigquery.Client")
@patch("bigquery_etl.dryrun.DryRun")
def test_force_deploy_with_not_matching_schemas(
self, mock_dryrun, mock_client, query_path
):
mock_dryrun().get_schema.return_value = {
"fields": [{"name": "doesnt_exist", "type": "STRING"}]
}
client = mock_client.return_value
client.get_table.side_effect = NotFound("table not found")
deploy.deploy_table(query_file=query_path / "query.sql", force=True)
client.create_table.assert_called_once()