Add tests for publishing json
This commit is contained in:
Родитель
825b6ccf0e
Коммит
438b87a6da
|
@ -7,6 +7,7 @@ pytest-docstyle==1.5.0
|
|||
pytest-flake8==1.0.4
|
||||
pytest-mypy==0.4.2
|
||||
pytest-xdist==1.31.0
|
||||
pytest-dependency==0.5.1
|
||||
pytest==5.3.5
|
||||
PyYAML==5.3
|
||||
sqlparse==0.3.0
|
||||
|
|
|
@ -229,6 +229,9 @@ pyrsistent==0.15.7 \
|
|||
pytest-black==0.3.8 \
|
||||
--hash=sha256:01a9a7acc69e618ebf3f834932a4d7a81909f6911051d0871b0ed4de3cbe9712 \
|
||||
# via -r requirements.in
|
||||
pytest-dependency==0.5.1 \
|
||||
--hash=sha256:c2a892906192663f85030a6ab91304e508e546cddfe557d692d61ec57a1d946b \
|
||||
# via -r requirements.in
|
||||
pytest-docstyle==1.5.0 \
|
||||
--hash=sha256:dcc54084b8e8282a83e50c6220c85d1c7d05e3871f74f0e911499b4f3adea756 \
|
||||
# via -r requirements.in
|
||||
|
@ -251,7 +254,7 @@ pytest-xdist==1.31.0 \
|
|||
pytest==5.3.5 \
|
||||
--hash=sha256:0d5fe9189a148acc3c3eb2ac8e1ac0742cb7618c084f3d228baaec0c254b318d \
|
||||
--hash=sha256:ff615c761e25eb25df19edddc0b970302d2a9091fbce0e7213298d85fb61fef6 \
|
||||
# via -r requirements.in, pytest-black, pytest-docstyle, pytest-flake8, pytest-forked, pytest-mypy, pytest-xdist
|
||||
# via -r requirements.in, pytest-black, pytest-dependency, pytest-docstyle, pytest-flake8, pytest-forked, pytest-mypy, pytest-xdist
|
||||
pytz==2019.3 \
|
||||
--hash=sha256:1c557d7d0e871de1f5ccd5833f60fb2550652da6be2693c1e02300743d21500d \
|
||||
--hash=sha256:b02c06db6cf09c12dd25137e563b31700d3b80fcc4ad23abb7a315f2789819be \
|
||||
|
|
|
@ -67,6 +67,8 @@ def publish_table_as_json(
|
|||
|
||||
table_ref = client.get_table(result_table)
|
||||
|
||||
print("jsonn")
|
||||
|
||||
job_config = bigquery.ExtractJobConfig()
|
||||
job_config.destination_format = "NEWLINE_DELIMITED_JSON"
|
||||
destination_uri = f"gs://{bucket}/" + prefix + "*.json"
|
||||
|
@ -139,6 +141,8 @@ def main():
|
|||
return
|
||||
|
||||
client = bigquery.Client(args.project_id)
|
||||
|
||||
if args.parameter:
|
||||
date_search = re.search(SUBMISSION_DATE_RE, args.parameter)
|
||||
|
||||
if date_search:
|
||||
|
|
|
@ -0,0 +1,168 @@
|
|||
import json
|
||||
import pytest
|
||||
import subprocess
|
||||
|
||||
from google.cloud import bigquery
|
||||
from google.cloud import storage
|
||||
from google.api_core.exceptions import NotFound
|
||||
|
||||
|
||||
class TestPublishJson(object):
|
||||
test_bucket = "moz-fx-data-stage-bigquery-etl"
|
||||
project_id = "moz-fx-data-shar-nonprod-efed"
|
||||
|
||||
non_incremental_sql_path = (
|
||||
"tests/publish_public_data_json/test_sql/test/"
|
||||
"non_incremental_query_v1/query.sql"
|
||||
)
|
||||
|
||||
incremental_sql_path = (
|
||||
"tests/publish_public_data_json/test_sql/test/incremental_query_v1/query.sql"
|
||||
)
|
||||
incremental_parameter = "submission_date:DATE:2020-03-15"
|
||||
|
||||
no_metadata_sql_path = (
|
||||
"tests/publish_public_data_json/test_sql/test/no_metadata_query_v1/query.sql"
|
||||
)
|
||||
|
||||
client = bigquery.Client(project_id)
|
||||
storage_client = storage.Client()
|
||||
bucket = storage_client.bucket(test_bucket)
|
||||
|
||||
temp_table = f"{project_id}.test.incremental_query_v1_20200315_temp"
|
||||
non_incremental_table = f"{project_id}.test.non_incremental_query_v1"
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self):
|
||||
# remove tables that might be there from previous failed tests
|
||||
try:
|
||||
self.client.delete_table(self.temp_table)
|
||||
except NotFound:
|
||||
pass
|
||||
|
||||
try:
|
||||
self.client.get_table(self.non_incremental_table)
|
||||
except NotFound:
|
||||
job_config = bigquery.QueryJobConfig(destination=self.non_incremental_table)
|
||||
|
||||
# create table for non-incremental query
|
||||
with open(self.non_incremental_sql_path) as query_stream:
|
||||
query = query_stream.read()
|
||||
query_job = self.client.query(query, job_config=job_config)
|
||||
query_job.result()
|
||||
|
||||
# remove json uploaded to storage by previous tests
|
||||
try:
|
||||
blob = self.bucket.blob("api/")
|
||||
blob.delete()
|
||||
except NotFound:
|
||||
pass
|
||||
|
||||
@pytest.mark.dependency(name="test_script_incremental_query")
|
||||
def test_script_incremental_query(self):
|
||||
res = subprocess.run(
|
||||
(
|
||||
"./script/publish_public_data_json",
|
||||
"publish_json",
|
||||
"--parameter=" + self.incremental_parameter,
|
||||
"--query_file=" + self.incremental_sql_path,
|
||||
"--target_bucket=" + self.test_bucket,
|
||||
"--project_id=" + self.project_id,
|
||||
)
|
||||
)
|
||||
|
||||
assert res.returncode == 0
|
||||
|
||||
def test_script_incremental_query_no_parameter(self):
|
||||
res = subprocess.run(
|
||||
(
|
||||
"./script/publish_public_data_json",
|
||||
"publish_json",
|
||||
"--query_file=" + self.incremental_sql_path,
|
||||
"--target_bucket=" + self.test_bucket,
|
||||
"--project_id=" + self.project_id,
|
||||
)
|
||||
)
|
||||
|
||||
assert res.returncode == 1
|
||||
|
||||
def test_query_without_metadata(self):
|
||||
res = subprocess.run(
|
||||
(
|
||||
"./script/publish_public_data_json",
|
||||
"publish_json",
|
||||
"--query_file=" + self.no_metadata_sql_path,
|
||||
)
|
||||
)
|
||||
|
||||
assert res.returncode == 0
|
||||
|
||||
@pytest.mark.dependency(name="test_script_non_incremental_query")
|
||||
def test_script_non_incremental_query(self):
|
||||
res = subprocess.run(
|
||||
(
|
||||
"./script/publish_public_data_json",
|
||||
"publish_json",
|
||||
"--query_file=" + self.non_incremental_sql_path,
|
||||
"--target_bucket=" + self.test_bucket,
|
||||
"--project_id=" + self.project_id,
|
||||
)
|
||||
)
|
||||
|
||||
assert res.returncode == 0
|
||||
|
||||
@pytest.mark.dependency(depends=["test_script_incremental_query"])
|
||||
def test_temporary_tables_removed(self):
|
||||
with pytest.raises(NotFound):
|
||||
self.client.get_table(self.temp_table)
|
||||
|
||||
@pytest.mark.dependency(depends=["test_script_non_incremental_query"])
|
||||
def test_non_incremental_query_gcs(self):
|
||||
gcp_path = "api/v1/tables/test/non_incremental_query/v1/files/"
|
||||
blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path)
|
||||
|
||||
expected_content = """[
|
||||
{
|
||||
"a": "val1",
|
||||
"b": "2"
|
||||
},
|
||||
{
|
||||
"a": "val3",
|
||||
"b": "8"
|
||||
},
|
||||
{
|
||||
"a": "val2",
|
||||
"b": "34"
|
||||
}
|
||||
]"""
|
||||
|
||||
blob_len = 0
|
||||
|
||||
for blob in blobs:
|
||||
# order of json data is different every time it is written
|
||||
# sort to test if content is the same
|
||||
content = json.dumps(
|
||||
json.loads(blob.download_as_string().decode("utf-8").strip()),
|
||||
sort_keys=True,
|
||||
indent=2,
|
||||
)
|
||||
blob_len += 1
|
||||
assert content == expected_content
|
||||
|
||||
assert blob_len == 1
|
||||
|
||||
@pytest.mark.dependency(depends=["test_script_non_incremental_query"])
|
||||
def test_incremental_query_gcs(self):
|
||||
gcp_path = "api/v1/tables/test/incremental_query/v1/files/2020-03-15/"
|
||||
blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path)
|
||||
|
||||
blob_len = 0
|
||||
|
||||
for blob in blobs:
|
||||
content = json.loads(blob.download_as_string().decode("utf-8").strip())
|
||||
blob_len += 1
|
||||
assert len(content) == 3
|
||||
|
||||
assert blob_len == 1
|
||||
|
||||
# todo: test splitting of files larger than 1GB
|
|
@ -0,0 +1,9 @@
|
|||
friendly_name: Test table for an incremental query
|
||||
description: >
|
||||
Test table for an incremental query
|
||||
owners:
|
||||
- ascholtz@mozilla.com
|
||||
labels:
|
||||
schedule: daily
|
||||
public_json: true
|
||||
incremental: true
|
|
@ -0,0 +1,14 @@
|
|||
SELECT
|
||||
DATE '2020-03-15' AS d,
|
||||
"val1" AS a,
|
||||
2 AS b
|
||||
UNION ALL
|
||||
SELECT
|
||||
DATE '2020-03-15' AS d,
|
||||
"val2" AS a,
|
||||
34 AS b
|
||||
UNION ALL
|
||||
SELECT
|
||||
DATE '2020-03-15' AS d,
|
||||
"val3" AS a,
|
||||
8 AS b
|
|
@ -0,0 +1,2 @@
|
|||
SELECT
|
||||
123
|
|
@ -0,0 +1,9 @@
|
|||
friendly_name: Test table for a non-incremental query
|
||||
description: >
|
||||
Test table for a non-incremental query
|
||||
owners:
|
||||
- ascholtz@mozilla.com
|
||||
labels:
|
||||
schedule: daily
|
||||
public_json: true
|
||||
incremental: false
|
|
@ -0,0 +1,11 @@
|
|||
SELECT
|
||||
"val1" AS a,
|
||||
2 AS b
|
||||
UNION ALL
|
||||
SELECT
|
||||
"val2" AS a,
|
||||
34 AS b
|
||||
UNION ALL
|
||||
SELECT
|
||||
"val3" AS a,
|
||||
8 AS b
|
Загрузка…
Ссылка в новой задаче