diff --git a/conftest.py b/conftest.py index c5cad83e61..6a36dd6d1a 100644 --- a/conftest.py +++ b/conftest.py @@ -7,6 +7,10 @@ import pytest import random import string + +TEST_BUCKET = "bigquery-etl-integration-test-bucket" + + pytest_plugins = [ "bigquery_etl.pytest_plugin.sql", "bigquery_etl.pytest_plugin.udf", @@ -23,21 +27,55 @@ def pytest_collection_modifyitems(config, items): if keywordexpr or markexpr: return - skip_integration = pytest.mark.skip( - reason='integration marker not selected' - ) + skip_integration = pytest.mark.skip(reason="integration marker not selected") for item in items: - if 'integration' in item.keywords: + if "integration" in item.keywords: item.add_marker(skip_integration) + +@pytest.fixture +def project_id(): + # GOOGLE_PROJECT_ID needs to be set for integration tests to run + project_id = os.environ["GOOGLE_PROJECT_ID"] + + return project_id + + @pytest.fixture def bigquery_client(): - try: - project_id = os.environ["GOOGLE_PROJECT_ID"] + project_id = os.environ["GOOGLE_PROJECT_ID"] + return bigquery.Client(project_id) + +@pytest.fixture +def temporary_dataset(): # generate a random test dataset to avoid conflicts when running tests in parallel - test_dataset = "test_" + "".join(random.choice(string.ascii_lowercase) for i in range(10)) + test_dataset = "test_" + "".join( + random.choice(string.ascii_lowercase) for i in range(12) + ) + project_id = os.environ["GOOGLE_PROJECT_ID"] + client = bigquery.Client(project_id) + client.create_dataset(test_dataset) + + yield test_dataset + + # cleanup and remove temporary dataset + client.delete_dataset(test_dataset, delete_contents=True, not_found_ok=True) + + +@pytest.fixture +def test_bucket(): + storage_client = storage.Client() + bucket = storage_client.bucket(TEST_BUCKET) + + yield bucket + + # cleanup test bucket + bucket.delete_blobs(bucket.list_blobs()) + + +@pytest.fixture def storage_client(): - pass \ No newline at end of file + yield storage.Client() diff --git a/tests/public_data/test_publish_public_data_json_script.py b/tests/public_data/test_publish_public_data_json_script.py index 03b8c742c3..da4d99ad5b 100644 --- a/tests/public_data/test_publish_public_data_json_script.py +++ b/tests/public_data/test_publish_public_data_json_script.py @@ -1,5 +1,4 @@ import json -import os import pytest import subprocess import zlib @@ -7,7 +6,6 @@ import zlib from pathlib import Path from datetime import datetime from google.cloud import bigquery -from google.cloud import storage from google.api_core.exceptions import NotFound @@ -16,194 +14,45 @@ TEST_DIR = Path(__file__).parent.parent @pytest.mark.integration class TestPublishJsonScript(object): - test_bucket = "bigquery-etl-integration-test-bucket" - project_id = os.environ["GOOGLE_PROJECT_ID"] + def test_script_incremental_query( + self, + storage_client, + test_bucket, + project_id, + temporary_dataset, + bigquery_client, + ): + incremental_sql_path = ( + TEST_DIR + / "data" + / "test_sql" + / "test" + / "incremental_query_v1" + / "query.sql" + ) - non_incremental_sql_path = ( - f"{str(TEST_DIR)}/data/test_sql/test/" "non_incremental_query_v1/query.sql" - ) - - incremental_non_incremental_export_sql_path = ( - f"{str(TEST_DIR)}/data/test_sql/test/" - "incremental_query_non_incremental_export_v1/query.sql" - ) - - incremental_sql_path = ( - f"{str(TEST_DIR)}/data/test_sql/test/incremental_query_v1/query.sql" - ) - incremental_parameter = "submission_date:DATE:2020-03-15" - - no_metadata_sql_path = ( - f"{str(TEST_DIR)}/data/test_sql/test/no_metadata_query_v1/query.sql" - ) - - client = bigquery.Client(project_id) - storage_client = storage.Client() - bucket = storage_client.bucket(test_bucket) - - temp_table = f"{project_id}.tmp.incremental_query_v1_20200315_temp" - non_incremental_table = f"{project_id}.test.non_incremental_query_v1" - incremental_non_incremental_export_table = ( - f"{project_id}.test.incremental_query_non_incremental_export_v1" - ) - - @pytest.fixture(autouse=True) - def setup(self): - # remove tables that might be there from previous failed tests - try: - self.client.delete_table(self.temp_table) - except NotFound: - pass - - try: - self.client.get_table(self.non_incremental_table) - except NotFound: - date_partition = bigquery.table.TimePartitioning(field="d") - job_config = bigquery.QueryJobConfig( - destination=self.non_incremental_table, time_partitioning=date_partition - ) - - # create table for non-incremental query - with open(self.non_incremental_sql_path) as query_stream: - query = query_stream.read() - query_job = self.client.query(query, job_config=job_config) - query_job.result() - - try: - self.client.get_table(self.incremental_non_incremental_export_table) - except NotFound: - date_partition = bigquery.table.TimePartitioning(field="d") - job_config = bigquery.QueryJobConfig( - destination=self.incremental_non_incremental_export_table, - time_partitioning=date_partition, - ) - - # create table for non-incremental query - with open(self.incremental_non_incremental_export_sql_path) as query_stream: - query = query_stream.read() - query_job = self.client.query(query, job_config=job_config) - query_job.result() - - # remove json uploaded to storage by previous tests - try: - blob = self.bucket.blob("api/") - blob.delete() - except NotFound: - pass - - @pytest.mark.dependency(name="test_script_incremental_query") - def test_script_incremental_query(self): res = subprocess.run( ( "./script/publish_public_data_json", "publish_json", "--parameter=a:INT64:9", - "--parameter=" + self.incremental_parameter, - "--query_file=" + self.incremental_sql_path, - "--target_bucket=" + self.test_bucket, - "--project_id=" + self.project_id, + "--parameter=submission_date:DATE:2020-03-15", + "--query_file=" + str(incremental_sql_path), + "--target_bucket=" + test_bucket.name, + "--project_id=" + project_id, ) ) assert res.returncode == 0 - def test_script_incremental_query_no_parameter(self): - res = subprocess.run( - ( - "./script/publish_public_data_json", - "publish_json", - "--query_file=" + self.incremental_sql_path, - "--target_bucket=" + self.test_bucket, - "--project_id=" + self.project_id, - ) - ) - - assert res.returncode == 1 - - def test_query_without_metadata(self): - res = subprocess.run( - ( - "./script/publish_public_data_json", - "publish_json", - "--query_file=" + self.no_metadata_sql_path, - ) - ) - - assert res.returncode == 0 - - @pytest.mark.dependency(name="test_script_non_incremental_query") - def test_script_non_incremental_query(self): - res = subprocess.run( - ( - "./script/publish_public_data_json", - "publish_json", - "--query_file=" + self.non_incremental_sql_path, - "--target_bucket=" + self.test_bucket, - "--project_id=" + self.project_id, - ) - ) - - assert res.returncode == 0 - - @pytest.mark.dependency(name="test_script_non_incremental_export") - def test_script_non_incremental_export(self): - res = subprocess.run( - ( - "./script/publish_public_data_json", - "publish_json", - "--parameter=a:INT64:9", - "--query_file=" + self.incremental_non_incremental_export_sql_path, - "--target_bucket=" + self.test_bucket, - "--project_id=" + self.project_id, - "--parameter=" + self.incremental_parameter, - ) - ) - assert res.returncode == 0 - - @pytest.mark.dependency(depends=["test_script_incremental_query"]) - def test_temporary_tables_removed(self): with pytest.raises(NotFound): - self.client.get_table(self.temp_table) + temp_table = ( + f"{project_id}.{temporary_dataset}.incremental_query_v1_20200315_temp" + ) + bigquery_client.get_table(temp_table) - @pytest.mark.dependency(depends=["test_script_non_incremental_query"]) - def test_non_incremental_query_gcs(self): - gcp_path = "api/v1/tables/test/non_incremental_query/v1/files/" - blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path) - - blob_count = 0 - - for blob in blobs: - blob_count += 1 - compressed = blob.download_as_string() - uncompressed = zlib.decompress(compressed, 16 + zlib.MAX_WBITS) - content = json.loads(uncompressed.decode("utf-8").strip()) - assert len(content) == 3 - - assert blob_count == 1 - - @pytest.mark.dependency(depends=["test_script_non_incremental_export"]) - def test_incremental_query_non_incremental_export_gcs(self): - gcp_path = ( - "api/v1/tables/test/incremental_query_non_incremental_export/v1/files/" - ) - - blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path) - - blob_count = 0 - - for blob in blobs: - blob_count += 1 - compressed = blob.download_as_string() - uncompressed = zlib.decompress(compressed, 16 + zlib.MAX_WBITS) - content = json.loads(uncompressed.decode("utf-8").strip()) - assert len(content) == 3 - - assert blob_count == 1 - - @pytest.mark.dependency(depends=["test_script_incremental_query"]) - def test_incremental_query_gcs(self): gcp_path = "api/v1/tables/test/incremental_query/v1/files/2020-03-15/" - blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path) + blobs = storage_client.list_blobs(test_bucket, prefix=gcp_path) blob_count = 0 @@ -216,10 +65,8 @@ class TestPublishJsonScript(object): assert blob_count == 1 - @pytest.mark.dependency(depends=["test_script_incremental_query"]) - def test_last_updated(self): gcp_path = "api/v1/tables/test/incremental_query/v1/last_updated" - blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path) + blobs = storage_client.list_blobs(test_bucket, prefix=gcp_path) blob_count = 0 @@ -229,3 +76,167 @@ class TestPublishJsonScript(object): datetime.strptime(last_updated, "%Y-%m-%d %H:%M:%S") assert blob_count == 1 + + def test_script_incremental_query_no_parameter( + self, test_bucket, project_id, temporary_dataset, bigquery_client + ): + incremental_sql_path = ( + TEST_DIR + / "data" + / "test_sql" + / "test" + / "incremental_query_v1" + / "query.sql" + ) + + res = subprocess.run( + ( + "./script/publish_public_data_json", + "publish_json", + "--query_file=" + str(incremental_sql_path), + "--target_bucket=" + test_bucket.name, + "--project_id=" + project_id, + ) + ) + + assert res.returncode == 1 + + def test_query_without_metadata(self): + no_metadata_sql_path = ( + TEST_DIR + / "data" + / "test_sql" + / "test" + / "no_metadata_query_v1" + / "query.sql" + ) + + res = subprocess.run( + ( + "./script/publish_public_data_json", + "publish_json", + "--query_file=" + str(no_metadata_sql_path), + ) + ) + + assert res.returncode == 0 + + def test_script_non_incremental_query( + self, + bigquery_client, + storage_client, + test_bucket, + project_id, + temporary_dataset, + ): + non_incremental_sql_path = ( + TEST_DIR + / "data" + / "test_sql" + / "test" + / "non_incremental_query_v1" + / "query.sql" + ) + + non_incremental_table = ( + f"{project_id}.{temporary_dataset}.non_incremental_query_v1" + ) + date_partition = bigquery.table.TimePartitioning(field="d") + job_config = bigquery.QueryJobConfig( + destination=non_incremental_table, time_partitioning=date_partition + ) + + with open(non_incremental_sql_path) as query_stream: + query = query_stream.read() + query_job = bigquery_client.query(query, job_config=job_config) + query_job.result() + + res = subprocess.run( + ( + "./script/publish_public_data_json", + "publish_json", + "--query_file=" + str(non_incremental_sql_path), + "--target_bucket=" + test_bucket.name, + "--project_id=" + project_id, + ) + ) + + assert res.returncode == 0 + + gcp_path = "api/v1/tables/test/non_incremental_query/v1/files/" + blobs = storage_client.list_blobs(test_bucket, prefix=gcp_path) + + blob_count = 0 + + for blob in blobs: + blob_count += 1 + compressed = blob.download_as_string() + uncompressed = zlib.decompress(compressed, 16 + zlib.MAX_WBITS) + content = json.loads(uncompressed.decode("utf-8").strip()) + assert len(content) == 3 + + assert blob_count == 1 + + def test_script_non_incremental_export( + self, + storage_client, + test_bucket, + project_id, + bigquery_client, + temporary_dataset, + ): + incremental_non_incremental_export_sql_path = ( + TEST_DIR + / "data" + / "test_sql" + / "test" + / "incremental_query_non_incremental_export_v1" + / "query.sql" + ) + + incremental_non_incremental_export_table = ( + f"{project_id}.{temporary_dataset}." + "incremental_query_non_incremental_export_v1" + ) + + date_partition = bigquery.table.TimePartitioning(field="d") + job_config = bigquery.QueryJobConfig( + destination=incremental_non_incremental_export_table, + time_partitioning=date_partition, + ) + + # create table for non-incremental query + with open(incremental_non_incremental_export_sql_path) as query_stream: + query = query_stream.read() + query_job = bigquery_client.query(query, job_config=job_config) + query_job.result() + + res = subprocess.run( + ( + "./script/publish_public_data_json", + "publish_json", + "--parameter=a:INT64:9", + "--query_file=" + str(incremental_non_incremental_export_sql_path), + "--target_bucket=" + test_bucket.name, + "--project_id=" + project_id, + "--parameter=submission_date:DATE:2020-03-15", + ) + ) + assert res.returncode == 0 + + gcp_path = ( + "api/v1/tables/test/incremental_query_non_incremental_export/v1/files/" + ) + + blobs = storage_client.list_blobs(test_bucket, prefix=gcp_path) + + blob_count = 0 + + for blob in blobs: + blob_count += 1 + compressed = blob.download_as_string() + uncompressed = zlib.decompress(compressed, 16 + zlib.MAX_WBITS) + content = json.loads(uncompressed.decode("utf-8").strip()) + assert len(content) == 3 + + assert blob_count == 1