Use streaming inserts for shredder state (#826)

This commit is contained in:
Daniel Thorn 2020-03-18 21:21:12 +01:00 коммит произвёл GitHub
Родитель 948a59d602
Коммит 90d266c708
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 66 добавлений и 55 удалений

Просмотреть файл

@ -12,11 +12,13 @@ from typing import Callable, Iterable, Optional
import logging
import warnings
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
from ..util.bigquery_id import FULL_JOB_ID_RE, full_job_id, sql_table_id
from ..util.client_queue import ClientQueue
from ..util.table_filter import add_table_filter_arguments, get_table_filter
from ..util.exceptions import BigQueryInsertError
from .config import DELETE_TARGETS
@ -127,28 +129,21 @@ def record_state(client, task_id, job, dry_run, start_date, end_date, state_tabl
insert_tense = "Would insert" if dry_run else "Inserting"
logging.info(f"{insert_tense} {job_id} in {state_table} for task: {task_id}")
if not dry_run:
client.query(
dedent(
f"""
INSERT INTO
`{state_table}`(
task_id,
job_id,
job_created,
start_date,
end_date
)
VALUES
(
"{task_id}",
"{job_id}",
TIMESTAMP "{job.created:%Y-%m-%d %H:%M:%S} UTC",
DATE "{start_date}",
DATE "{end_date}"
)
"""
).strip()
).result()
BigQueryInsertError.raise_if_present(
errors=client.insert_rows_json(
state_table,
[
{
"task_id": task_id,
"job_id": job_id,
"job_created": job.created.isoformat(),
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
}
],
skip_invalid_rows=False,
)
)
def wait_for_job(client, states, task_id, dry_run, create_job, **state_kwargs):
@ -354,39 +349,44 @@ def main():
client = client_q.default_client
states = {}
if args.state_table:
client.query(
dedent(
f"""
CREATE TABLE IF NOT EXISTS
`{args.state_table}`(
task_id STRING,
job_id STRING,
job_created TIMESTAMP,
start_date DATE,
end_date DATE
)
"""
).strip()
)
states = {
row["task_id"]: row["job_id"]
for row in client.query(
dedent(
f"""
SELECT
task_id,
job_id,
FROM
`{args.state_table}`
WHERE
start_date = '{args.start_date}'
AND end_date = '{args.end_date}'
ORDER BY
job_created
"""
).strip()
).result()
}
state_table_exists = False
try:
client.get_table(args.state_table)
state_table_exists = True
except NotFound:
if not args.dry_run:
client.create_table(
bigquery.Table(
args.state_table,
[
bigquery.SchemaField("task_id", "STRING"),
bigquery.SchemaField("job_id", "STRING"),
bigquery.SchemaField("job_created", "TIMESTAMP"),
bigquery.SchemaField("start_date", "DATE"),
bigquery.SchemaField("end_date", "DATE"),
],
)
)
state_table_exists = True
if state_table_exists:
states = dict(
client.query(
dedent(
f"""
SELECT
task_id,
job_id,
FROM
`{args.state_table}`
WHERE
start_date = '{args.start_date}'
AND end_date = '{args.end_date}'
ORDER BY
job_created
"""
).strip()
).result()
)
tasks = [
task
for target, source in DELETE_TARGETS.items()

Просмотреть файл

@ -0,0 +1,11 @@
"""Common exception types."""
class BigQueryInsertError(Exception):
"""Wrap errors returned by BigQuery insertAll requests in an Exception."""
@staticmethod
def raise_if_present(errors):
"""Raise this exception if errors is truthy."""
if errors:
raise BigQueryInsertError(errors)