This commit is contained in:
Anna Scholtz 2020-04-29 16:13:03 -07:00
Родитель b3ddfc3cf5
Коммит f961ed3137
2 изменённых файлов: 236 добавлений и 187 удалений

Просмотреть файл

@ -7,6 +7,10 @@ import pytest
import random
import string
TEST_BUCKET = "bigquery-etl-integration-test-bucket"
pytest_plugins = [
"bigquery_etl.pytest_plugin.sql",
"bigquery_etl.pytest_plugin.udf",
@ -23,21 +27,55 @@ def pytest_collection_modifyitems(config, items):
if keywordexpr or markexpr:
return
skip_integration = pytest.mark.skip(
reason='integration marker not selected'
)
skip_integration = pytest.mark.skip(reason="integration marker not selected")
for item in items:
if 'integration' in item.keywords:
if "integration" in item.keywords:
item.add_marker(skip_integration)
@pytest.fixture
def project_id():
# GOOGLE_PROJECT_ID needs to be set for integration tests to run
project_id = os.environ["GOOGLE_PROJECT_ID"]
return project_id
@pytest.fixture
def bigquery_client():
try:
project_id = os.environ["GOOGLE_PROJECT_ID"]
project_id = os.environ["GOOGLE_PROJECT_ID"]
return bigquery.Client(project_id)
@pytest.fixture
def temporary_dataset():
# generate a random test dataset to avoid conflicts when running tests in parallel
test_dataset = "test_" + "".join(random.choice(string.ascii_lowercase) for i in range(10))
test_dataset = "test_" + "".join(
random.choice(string.ascii_lowercase) for i in range(12)
)
project_id = os.environ["GOOGLE_PROJECT_ID"]
client = bigquery.Client(project_id)
client.create_dataset(test_dataset)
yield test_dataset
# cleanup and remove temporary dataset
client.delete_dataset(test_dataset, delete_contents=True, not_found_ok=True)
@pytest.fixture
def test_bucket():
storage_client = storage.Client()
bucket = storage_client.bucket(TEST_BUCKET)
yield bucket
# cleanup test bucket
bucket.delete_blobs(bucket.list_blobs())
@pytest.fixture
def storage_client():
pass
yield storage.Client()

Просмотреть файл

@ -1,5 +1,4 @@
import json
import os
import pytest
import subprocess
import zlib
@ -7,7 +6,6 @@ import zlib
from pathlib import Path
from datetime import datetime
from google.cloud import bigquery
from google.cloud import storage
from google.api_core.exceptions import NotFound
@ -16,194 +14,45 @@ TEST_DIR = Path(__file__).parent.parent
@pytest.mark.integration
class TestPublishJsonScript(object):
test_bucket = "bigquery-etl-integration-test-bucket"
project_id = os.environ["GOOGLE_PROJECT_ID"]
def test_script_incremental_query(
self,
storage_client,
test_bucket,
project_id,
temporary_dataset,
bigquery_client,
):
incremental_sql_path = (
TEST_DIR
/ "data"
/ "test_sql"
/ "test"
/ "incremental_query_v1"
/ "query.sql"
)
non_incremental_sql_path = (
f"{str(TEST_DIR)}/data/test_sql/test/" "non_incremental_query_v1/query.sql"
)
incremental_non_incremental_export_sql_path = (
f"{str(TEST_DIR)}/data/test_sql/test/"
"incremental_query_non_incremental_export_v1/query.sql"
)
incremental_sql_path = (
f"{str(TEST_DIR)}/data/test_sql/test/incremental_query_v1/query.sql"
)
incremental_parameter = "submission_date:DATE:2020-03-15"
no_metadata_sql_path = (
f"{str(TEST_DIR)}/data/test_sql/test/no_metadata_query_v1/query.sql"
)
client = bigquery.Client(project_id)
storage_client = storage.Client()
bucket = storage_client.bucket(test_bucket)
temp_table = f"{project_id}.tmp.incremental_query_v1_20200315_temp"
non_incremental_table = f"{project_id}.test.non_incremental_query_v1"
incremental_non_incremental_export_table = (
f"{project_id}.test.incremental_query_non_incremental_export_v1"
)
@pytest.fixture(autouse=True)
def setup(self):
# remove tables that might be there from previous failed tests
try:
self.client.delete_table(self.temp_table)
except NotFound:
pass
try:
self.client.get_table(self.non_incremental_table)
except NotFound:
date_partition = bigquery.table.TimePartitioning(field="d")
job_config = bigquery.QueryJobConfig(
destination=self.non_incremental_table, time_partitioning=date_partition
)
# create table for non-incremental query
with open(self.non_incremental_sql_path) as query_stream:
query = query_stream.read()
query_job = self.client.query(query, job_config=job_config)
query_job.result()
try:
self.client.get_table(self.incremental_non_incremental_export_table)
except NotFound:
date_partition = bigquery.table.TimePartitioning(field="d")
job_config = bigquery.QueryJobConfig(
destination=self.incremental_non_incremental_export_table,
time_partitioning=date_partition,
)
# create table for non-incremental query
with open(self.incremental_non_incremental_export_sql_path) as query_stream:
query = query_stream.read()
query_job = self.client.query(query, job_config=job_config)
query_job.result()
# remove json uploaded to storage by previous tests
try:
blob = self.bucket.blob("api/")
blob.delete()
except NotFound:
pass
@pytest.mark.dependency(name="test_script_incremental_query")
def test_script_incremental_query(self):
res = subprocess.run(
(
"./script/publish_public_data_json",
"publish_json",
"--parameter=a:INT64:9",
"--parameter=" + self.incremental_parameter,
"--query_file=" + self.incremental_sql_path,
"--target_bucket=" + self.test_bucket,
"--project_id=" + self.project_id,
"--parameter=submission_date:DATE:2020-03-15",
"--query_file=" + str(incremental_sql_path),
"--target_bucket=" + test_bucket.name,
"--project_id=" + project_id,
)
)
assert res.returncode == 0
def test_script_incremental_query_no_parameter(self):
res = subprocess.run(
(
"./script/publish_public_data_json",
"publish_json",
"--query_file=" + self.incremental_sql_path,
"--target_bucket=" + self.test_bucket,
"--project_id=" + self.project_id,
)
)
assert res.returncode == 1
def test_query_without_metadata(self):
res = subprocess.run(
(
"./script/publish_public_data_json",
"publish_json",
"--query_file=" + self.no_metadata_sql_path,
)
)
assert res.returncode == 0
@pytest.mark.dependency(name="test_script_non_incremental_query")
def test_script_non_incremental_query(self):
res = subprocess.run(
(
"./script/publish_public_data_json",
"publish_json",
"--query_file=" + self.non_incremental_sql_path,
"--target_bucket=" + self.test_bucket,
"--project_id=" + self.project_id,
)
)
assert res.returncode == 0
@pytest.mark.dependency(name="test_script_non_incremental_export")
def test_script_non_incremental_export(self):
res = subprocess.run(
(
"./script/publish_public_data_json",
"publish_json",
"--parameter=a:INT64:9",
"--query_file=" + self.incremental_non_incremental_export_sql_path,
"--target_bucket=" + self.test_bucket,
"--project_id=" + self.project_id,
"--parameter=" + self.incremental_parameter,
)
)
assert res.returncode == 0
@pytest.mark.dependency(depends=["test_script_incremental_query"])
def test_temporary_tables_removed(self):
with pytest.raises(NotFound):
self.client.get_table(self.temp_table)
temp_table = (
f"{project_id}.{temporary_dataset}.incremental_query_v1_20200315_temp"
)
bigquery_client.get_table(temp_table)
@pytest.mark.dependency(depends=["test_script_non_incremental_query"])
def test_non_incremental_query_gcs(self):
gcp_path = "api/v1/tables/test/non_incremental_query/v1/files/"
blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path)
blob_count = 0
for blob in blobs:
blob_count += 1
compressed = blob.download_as_string()
uncompressed = zlib.decompress(compressed, 16 + zlib.MAX_WBITS)
content = json.loads(uncompressed.decode("utf-8").strip())
assert len(content) == 3
assert blob_count == 1
@pytest.mark.dependency(depends=["test_script_non_incremental_export"])
def test_incremental_query_non_incremental_export_gcs(self):
gcp_path = (
"api/v1/tables/test/incremental_query_non_incremental_export/v1/files/"
)
blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path)
blob_count = 0
for blob in blobs:
blob_count += 1
compressed = blob.download_as_string()
uncompressed = zlib.decompress(compressed, 16 + zlib.MAX_WBITS)
content = json.loads(uncompressed.decode("utf-8").strip())
assert len(content) == 3
assert blob_count == 1
@pytest.mark.dependency(depends=["test_script_incremental_query"])
def test_incremental_query_gcs(self):
gcp_path = "api/v1/tables/test/incremental_query/v1/files/2020-03-15/"
blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path)
blobs = storage_client.list_blobs(test_bucket, prefix=gcp_path)
blob_count = 0
@ -216,10 +65,8 @@ class TestPublishJsonScript(object):
assert blob_count == 1
@pytest.mark.dependency(depends=["test_script_incremental_query"])
def test_last_updated(self):
gcp_path = "api/v1/tables/test/incremental_query/v1/last_updated"
blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path)
blobs = storage_client.list_blobs(test_bucket, prefix=gcp_path)
blob_count = 0
@ -229,3 +76,167 @@ class TestPublishJsonScript(object):
datetime.strptime(last_updated, "%Y-%m-%d %H:%M:%S")
assert blob_count == 1
def test_script_incremental_query_no_parameter(
self, test_bucket, project_id, temporary_dataset, bigquery_client
):
incremental_sql_path = (
TEST_DIR
/ "data"
/ "test_sql"
/ "test"
/ "incremental_query_v1"
/ "query.sql"
)
res = subprocess.run(
(
"./script/publish_public_data_json",
"publish_json",
"--query_file=" + str(incremental_sql_path),
"--target_bucket=" + test_bucket.name,
"--project_id=" + project_id,
)
)
assert res.returncode == 1
def test_query_without_metadata(self):
no_metadata_sql_path = (
TEST_DIR
/ "data"
/ "test_sql"
/ "test"
/ "no_metadata_query_v1"
/ "query.sql"
)
res = subprocess.run(
(
"./script/publish_public_data_json",
"publish_json",
"--query_file=" + str(no_metadata_sql_path),
)
)
assert res.returncode == 0
def test_script_non_incremental_query(
self,
bigquery_client,
storage_client,
test_bucket,
project_id,
temporary_dataset,
):
non_incremental_sql_path = (
TEST_DIR
/ "data"
/ "test_sql"
/ "test"
/ "non_incremental_query_v1"
/ "query.sql"
)
non_incremental_table = (
f"{project_id}.{temporary_dataset}.non_incremental_query_v1"
)
date_partition = bigquery.table.TimePartitioning(field="d")
job_config = bigquery.QueryJobConfig(
destination=non_incremental_table, time_partitioning=date_partition
)
with open(non_incremental_sql_path) as query_stream:
query = query_stream.read()
query_job = bigquery_client.query(query, job_config=job_config)
query_job.result()
res = subprocess.run(
(
"./script/publish_public_data_json",
"publish_json",
"--query_file=" + str(non_incremental_sql_path),
"--target_bucket=" + test_bucket.name,
"--project_id=" + project_id,
)
)
assert res.returncode == 0
gcp_path = "api/v1/tables/test/non_incremental_query/v1/files/"
blobs = storage_client.list_blobs(test_bucket, prefix=gcp_path)
blob_count = 0
for blob in blobs:
blob_count += 1
compressed = blob.download_as_string()
uncompressed = zlib.decompress(compressed, 16 + zlib.MAX_WBITS)
content = json.loads(uncompressed.decode("utf-8").strip())
assert len(content) == 3
assert blob_count == 1
def test_script_non_incremental_export(
self,
storage_client,
test_bucket,
project_id,
bigquery_client,
temporary_dataset,
):
incremental_non_incremental_export_sql_path = (
TEST_DIR
/ "data"
/ "test_sql"
/ "test"
/ "incremental_query_non_incremental_export_v1"
/ "query.sql"
)
incremental_non_incremental_export_table = (
f"{project_id}.{temporary_dataset}."
"incremental_query_non_incremental_export_v1"
)
date_partition = bigquery.table.TimePartitioning(field="d")
job_config = bigquery.QueryJobConfig(
destination=incremental_non_incremental_export_table,
time_partitioning=date_partition,
)
# create table for non-incremental query
with open(incremental_non_incremental_export_sql_path) as query_stream:
query = query_stream.read()
query_job = bigquery_client.query(query, job_config=job_config)
query_job.result()
res = subprocess.run(
(
"./script/publish_public_data_json",
"publish_json",
"--parameter=a:INT64:9",
"--query_file=" + str(incremental_non_incremental_export_sql_path),
"--target_bucket=" + test_bucket.name,
"--project_id=" + project_id,
"--parameter=submission_date:DATE:2020-03-15",
)
)
assert res.returncode == 0
gcp_path = (
"api/v1/tables/test/incremental_query_non_incremental_export/v1/files/"
)
blobs = storage_client.list_blobs(test_bucket, prefix=gcp_path)
blob_count = 0
for blob in blobs:
blob_count += 1
compressed = blob.download_as_string()
uncompressed = zlib.decompress(compressed, 16 + zlib.MAX_WBITS)
content = json.loads(uncompressed.decode("utf-8").strip())
assert len(content) == 3
assert blob_count == 1