Add --preceding-days option to copy_deduplicate (#413)
This commit is contained in:
Родитель
bb394b63fd
Коммит
9bf053de74
|
@ -32,10 +32,15 @@ WITH
|
|||
FROM
|
||||
`{live_table}`
|
||||
WHERE
|
||||
DATE(submission_timestamp) = DATE(@start_time)
|
||||
DATE(submission_timestamp) >= DATE_SUB(
|
||||
DATE(@start_time),
|
||||
INTERVAL @num_preceding_days DAY
|
||||
)
|
||||
AND submission_timestamp < @end_time
|
||||
GROUP BY
|
||||
document_id),
|
||||
document_id
|
||||
HAVING
|
||||
submission_timestamp >= @start_time),
|
||||
-- A single slice of a live ping table.
|
||||
base AS (
|
||||
SELECT
|
||||
|
@ -123,6 +128,13 @@ parser.add_argument(
|
|||
const=24,
|
||||
help="Deduplicate one hour at a time; equivalent to --slices=24",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--preceding_days",
|
||||
"--preceding-days",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Number of days preceding --date that should be used to filter out duplicates",
|
||||
)
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
group.add_argument(
|
||||
"--only",
|
||||
|
@ -196,7 +208,9 @@ def sql_full_table_id(table):
|
|||
return f"{table.project}.{table.dataset_id}.{table.table_id}"
|
||||
|
||||
|
||||
def get_query_job_configs(client, live_table, date, dry_run, slices, priority):
|
||||
def get_query_job_configs(
|
||||
client, live_table, date, dry_run, slices, priority, preceding_days
|
||||
):
|
||||
sql = QUERY_TEMPLATE.format(live_table=live_table)
|
||||
stable_table = f"{live_table.replace('_live.', '_stable.', 1)}${date:%Y%m%d}"
|
||||
kwargs = dict(use_legacy_sql=False, dry_run=dry_run, priority=priority)
|
||||
|
@ -232,6 +246,9 @@ def get_query_job_configs(client, live_table, date, dry_run, slices, priority):
|
|||
bigquery.ScalarQueryParameter(
|
||||
"end_time", "TIMESTAMP", params[i + 1]
|
||||
),
|
||||
bigquery.ScalarQueryParameter(
|
||||
"num_preceding_days", "INT64", preceding_days
|
||||
),
|
||||
],
|
||||
**kwargs,
|
||||
),
|
||||
|
@ -254,6 +271,9 @@ def get_query_job_configs(client, live_table, date, dry_run, slices, priority):
|
|||
bigquery.ScalarQueryParameter(
|
||||
"end_time", "TIMESTAMP", end_time
|
||||
),
|
||||
bigquery.ScalarQueryParameter(
|
||||
"num_preceding_days", "INT64", preceding_days
|
||||
),
|
||||
],
|
||||
**kwargs,
|
||||
),
|
||||
|
@ -364,6 +384,7 @@ def main():
|
|||
args.dry_run,
|
||||
args.slices,
|
||||
args.priority,
|
||||
args.preceding_days,
|
||||
)
|
||||
for live_table in live_tables
|
||||
],
|
||||
|
|
Загрузка…
Ссылка в новой задаче