Refactor json export
This commit is contained in:
Родитель
0b00e222bf
Коммит
9ab5babcdb
|
@ -13,6 +13,8 @@ METADATA_FILE = "metadata.yaml"
|
|||
SUBMISSION_DATE_RE = re.compile(r"^submission_date:DATE:(\d\d\d\d-\d\d-\d\d)$")
|
||||
QUERY_FILE_RE = re.compile(r"^.*/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+)_(v[0-9]+)/query\.sql$")
|
||||
MAX_JSON_SIZE = 1 * 1024 * 1024 * 1024 # 1 GB as max. size of exported JSON files
|
||||
# maximum number of JSON output files, output file names are only up to 12 characters
|
||||
MAX_FILE_COUNT = 999_999_999_999
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG, format="%(asctime)s: %(levelname)s: %(message)s"
|
||||
|
@ -46,6 +48,8 @@ class JsonPublisher:
|
|||
|
||||
self.metadata = Metadata.of_sql_file(self.query_file)
|
||||
|
||||
# only for incremental exports files are written into separate directories
|
||||
# for each date, ignore date parameters for non-incremental exports
|
||||
if self.metadata.is_incremental() and self.parameter:
|
||||
for p in self.parameter:
|
||||
date_search = re.search(SUBMISSION_DATE_RE, p)
|
||||
|
@ -154,7 +158,7 @@ class JsonPublisher:
|
|||
output_file.write("]")
|
||||
output_file.close()
|
||||
|
||||
if output_file_counter >= 1000000000000:
|
||||
if output_file_counter >= MAX_FILE_COUNT:
|
||||
logging.error(
|
||||
"Maximum number of JSON output files reached."
|
||||
)
|
||||
|
@ -169,17 +173,17 @@ class JsonPublisher:
|
|||
logging.info(f"""Write {blob_path} to {tmp_blob_path}""")
|
||||
|
||||
output_file = smart_open.open(tmp_blob_path, "w")
|
||||
output_file.write("[\n")
|
||||
output_file.write("[")
|
||||
first_line = True
|
||||
output_file_counter += 1
|
||||
output_size = 3
|
||||
|
||||
# skip the first line, it has no preceding json object
|
||||
if not first_line:
|
||||
output_file.write(",\n")
|
||||
output_file.write(",")
|
||||
|
||||
output_file.write(line.replace("\n", ""))
|
||||
output_size += len(line) + 1
|
||||
output_size += len(line)
|
||||
first_line = False
|
||||
|
||||
output_file.write("]")
|
||||
|
|
|
@ -148,5 +148,5 @@ class TestPublishJson(object):
|
|||
)
|
||||
|
||||
file_handler.write.assert_has_calls(
|
||||
[call("[\n"), call('{"a": 1}'), call(",\n"), call('{"b": "cc"}'), call("]")]
|
||||
[call("["), call('{"a": 1}'), call(","), call('{"b": "cc"}'), call("]")]
|
||||
)
|
||||
|
|
|
@ -46,7 +46,10 @@ class TestPublishJsonScript(object):
|
|||
try:
|
||||
self.client.get_table(self.non_incremental_table)
|
||||
except NotFound:
|
||||
job_config = bigquery.QueryJobConfig(destination=self.non_incremental_table)
|
||||
date_partition = bigquery.table.TimePartitioning(field="d")
|
||||
job_config = bigquery.QueryJobConfig(
|
||||
destination=self.non_incremental_table, time_partitioning=date_partition
|
||||
)
|
||||
|
||||
# create table for non-incremental query
|
||||
with open(self.non_incremental_sql_path) as query_stream:
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
SELECT
|
||||
DATE '2020-03-15' AS d,
|
||||
"val1" AS a,
|
||||
2 AS b
|
||||
UNION ALL
|
||||
SELECT
|
||||
DATE '2020-03-16' AS d,
|
||||
"val2" AS a,
|
||||
34 AS b
|
||||
UNION ALL
|
||||
SELECT
|
||||
DATE '2020-03-16' AS d,
|
||||
"val3" AS a,
|
||||
8 AS b
|
||||
|
|
Загрузка…
Ссылка в новой задаче