Write last_updated to file when data gets published
This commit is contained in:
Родитель
e2e749f6bb
Коммит
79feea7b8f
|
@ -68,7 +68,6 @@ class GcsTableMetadata:
|
|||
|
||||
self.last_updated_path = self.blobs[0].name.split("files")[0] + "last_updated"
|
||||
self.last_updated_uri = endpoint + self.last_updated_path
|
||||
self.last_updated = min(self.blobs, key=lambda b: b.updated).updated
|
||||
|
||||
def table_metadata_to_json(self):
|
||||
"""Return a JSON object of the table metadata for GCS."""
|
||||
|
@ -167,16 +166,6 @@ def publish_table_metadata(table_metadata, bucket):
|
|||
fout.write(json.dumps(metadata.files_metadata_to_json(), indent=4))
|
||||
|
||||
|
||||
def publish_last_modified(table_metadata, bucket):
|
||||
"""Write the timestamp when file of the dataset were last modified to GCS."""
|
||||
for metadata in table_metadata:
|
||||
output_file = f"gs://{bucket}/{metadata.last_updated_path}"
|
||||
|
||||
logging.info(f"Write last_updated to {output_file}")
|
||||
with smart_open.open(output_file, "w") as fout:
|
||||
fout.write(metadata.last_updated.strftime("%Y-%m-%d %H:%M:%S"))
|
||||
|
||||
|
||||
def main():
|
||||
"""Generate and upload GCS metadata."""
|
||||
args = parser.parse_args()
|
||||
|
@ -200,7 +189,6 @@ def main():
|
|||
output_file = f"gs://{args.target_bucket}/all_datasets.json"
|
||||
publish_all_datasets_metadata(gcs_table_metadata, output_file)
|
||||
publish_table_metadata(gcs_table_metadata, args.target_bucket)
|
||||
publish_last_modified(gcs_table_metadata, args.target_bucket)
|
||||
else:
|
||||
print(
|
||||
f"Invalid target: {args.target}, target must be a directory with"
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
from argparse import ArgumentParser
|
||||
from google.cloud import storage
|
||||
from google.cloud import bigquery
|
||||
import datetime
|
||||
import smart_open
|
||||
import logging
|
||||
import sys
|
||||
|
@ -89,6 +90,8 @@ class JsonPublisher:
|
|||
|
||||
def publish_json(self):
|
||||
"""Publish query results as JSON to GCP Storage bucket."""
|
||||
self.last_updated = datetime.datetime.utcnow()
|
||||
|
||||
if self.metadata.is_incremental_export():
|
||||
if self.date is None:
|
||||
logging.error(
|
||||
|
@ -105,6 +108,8 @@ class JsonPublisher:
|
|||
result_table = f"{self.dataset}.{self.table}_{self.version}"
|
||||
self._publish_table_as_json(result_table)
|
||||
|
||||
self._publish_last_updated()
|
||||
|
||||
def _publish_table_as_json(self, result_table):
|
||||
"""Export the `result_table` data as JSON to Cloud Storage."""
|
||||
prefix = (
|
||||
|
@ -234,6 +239,19 @@ class JsonPublisher:
|
|||
query_job = self.client.query(sql, job_config=job_config)
|
||||
query_job.result()
|
||||
|
||||
def _publish_last_updated(self):
|
||||
"""Write the timestamp when file of the dataset were last modified to GCS."""
|
||||
last_updated_path = (
|
||||
f"api/{self.api_version}/tables/{self.dataset}/"
|
||||
f"{self.table}/{self.version}/last_updated"
|
||||
)
|
||||
output_file = f"gs://{self.target_bucket}/{last_updated_path}"
|
||||
|
||||
logging.info(f"Write last_updated to {output_file}")
|
||||
|
||||
with smart_open.open(output_file, "w") as fout:
|
||||
fout.write(self.last_updated.strftime("%Y-%m-%d %H:%M:%S"))
|
||||
|
||||
|
||||
parser = ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
|
|
|
@ -94,7 +94,6 @@ class TestPublishGcsMetadata(object):
|
|||
assert gcs_table_metadata.metadata.review_bug() == "1999999"
|
||||
assert gcs_table_metadata.last_updated_path == last_updated_path
|
||||
assert gcs_table_metadata.last_updated_uri == self.endpoint + last_updated_path
|
||||
assert gcs_table_metadata.last_updated == datetime(2020, 4, 3, 11, 30, 1)
|
||||
|
||||
def test_gcs_table_metadata_to_json(self):
|
||||
mock_blob = Mock()
|
||||
|
@ -279,46 +278,3 @@ class TestPublishGcsMetadata(object):
|
|||
call(json.dumps(expected_incremental_query_json, indent=4)),
|
||||
]
|
||||
)
|
||||
|
||||
def test_last_updated(self):
|
||||
mock_blob1 = Mock()
|
||||
mock_blob1.name = (
|
||||
"api/v1/tables/test/non_incremental_query/v1/files/000000000000.json.gz"
|
||||
)
|
||||
mock_blob1.updated = datetime(2020, 4, 3, 11, 25, 5)
|
||||
|
||||
mock_blob2 = Mock()
|
||||
mock_blob2.name = (
|
||||
"api/v1/tables/test/non_incremental_query/v1/files/000000000001.json.gz"
|
||||
)
|
||||
mock_blob2.updated = datetime(2020, 4, 3, 11, 20, 5)
|
||||
|
||||
mock_blob3 = Mock()
|
||||
mock_blob3.name = (
|
||||
"api/v1/tables/test/non_incremental_query/v1/files/000000000002.json.gz"
|
||||
)
|
||||
mock_blob3.updated = datetime(2020, 4, 3, 12, 20, 5)
|
||||
|
||||
gcs_metadata = pgm.GcsTableMetadata(
|
||||
[mock_blob1, mock_blob2, mock_blob3], self.endpoint, self.sql_dir
|
||||
)
|
||||
assert gcs_metadata.last_updated == datetime(2020, 4, 3, 11, 20, 5)
|
||||
|
||||
def test_publish_last_updated_to_gcs(self):
|
||||
mock_blob1 = Mock()
|
||||
mock_blob1.name = (
|
||||
"api/v1/tables/test/non_incremental_query/v1/files/000000000000.json.gz"
|
||||
)
|
||||
mock_blob1.updated = datetime(2020, 4, 3, 11, 25, 5)
|
||||
|
||||
gcs_table_metadata = [
|
||||
pgm.GcsTableMetadata([mock_blob1], self.endpoint, self.sql_dir)
|
||||
]
|
||||
|
||||
mock_out = MagicMock()
|
||||
file_handler = MagicMock()
|
||||
file_handler.__enter__.return_value = mock_out
|
||||
smart_open.open = MagicMock(return_value=file_handler)
|
||||
|
||||
pgm.publish_last_modified(gcs_table_metadata, self.test_bucket)
|
||||
mock_out.write.assert_called_with("2020-04-03 11:25:05")
|
||||
|
|
|
@ -155,3 +155,28 @@ class TestPublishJson(object):
|
|||
file_handler.write.assert_has_calls(
|
||||
[call("["), call('{"a": 1}'), call(","), call('{"b": "cc"}'), call("]")]
|
||||
)
|
||||
|
||||
def test_publish_last_updated_to_gcs(self):
|
||||
publisher = JsonPublisher(
|
||||
self.mock_client,
|
||||
self.mock_storage_client,
|
||||
self.project_id,
|
||||
self.incremental_sql_path,
|
||||
self.api_version,
|
||||
self.test_bucket,
|
||||
["submission_date:DATE:2020-03-15"],
|
||||
)
|
||||
|
||||
mock_out = MagicMock(side_effect=[['{"a": 1}', '{"b": "cc"}'], None])
|
||||
mock_out.__iter__ = Mock(return_value=iter(['{"a": 1}', '{"b": "cc"}']))
|
||||
file_handler = MagicMock()
|
||||
file_handler.__enter__.return_value = mock_out
|
||||
|
||||
smart_open.open = MagicMock(return_value=file_handler)
|
||||
|
||||
publisher.publish_json()
|
||||
|
||||
assert publisher.last_updated is not None
|
||||
mock_out.write.assert_called_with(
|
||||
publisher.last_updated.strftime("%Y-%m-%d %H:%M:%S")
|
||||
)
|
||||
|
|
|
@ -4,7 +4,7 @@ import pytest
|
|||
import subprocess
|
||||
import zlib
|
||||
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from google.cloud import bigquery
|
||||
from google.cloud import storage
|
||||
from google.api_core.exceptions import NotFound
|
||||
|
@ -214,3 +214,17 @@ class TestPublishJsonScript(object):
|
|||
assert len(content) == 3
|
||||
|
||||
assert blob_count == 1
|
||||
|
||||
@pytest.mark.dependency(depends=["test_script_incremental_query"])
|
||||
def test_last_updated(self):
|
||||
gcp_path = "api/v1/tables/test/incremental_query/v1/last_updated"
|
||||
blobs = self.storage_client.list_blobs(self.test_bucket, prefix=gcp_path)
|
||||
|
||||
blob_count = 0
|
||||
|
||||
for blob in blobs:
|
||||
blob_count += 1
|
||||
last_updated = blob.download_as_string()
|
||||
datetime.strptime(last_updated.decode("utf-8"), "%Y-%m-%d %H:%M:%S")
|
||||
|
||||
assert blob_count == 1
|
||||
|
|
Загрузка…
Ссылка в новой задаче