This commit is contained in:
Jeff Klukas 2019-07-31 14:11:22 -04:00
Родитель 4242d95777
Коммит 0bc42132a2
1 изменённых файлов: 17 добавлений и 3 удалений

Просмотреть файл

@ -13,6 +13,7 @@ or to process only a specific list of tables.
from argparse import ArgumentParser
from datetime import datetime
from fnmatch import fnmatch
from multiprocessing.pool import ThreadPool
from google.cloud import bigquery
@ -47,6 +48,12 @@ parser.add_argument(
type=lambda d: datetime.strptime(d, "%Y-%m-%d").date(),
help="Which day's data to copy, in format 2019-01-01",
)
parser.add_argument(
"--parallelism",
default=4,
type=int,
help="Maximum number of queries to execute concurrently",
)
parser.add_argument(
"--dry-run",
action="store_true",
@ -109,6 +116,10 @@ def run_deduplication_query(client, live_table, stable_table, date, dry_run):
)
def worker_entrypoint(args):
run_deduplication_query(*args)
def main():
args = parser.parse_args()
@ -119,6 +130,8 @@ def main():
if d.dataset_id.endswith("_live")
]
job_args = []
for live_dataset in live_datasets:
stable_dataset_id = live_dataset.dataset_id[:-5] + "_stable"
for live_table in client.list_tables(live_dataset.reference):
@ -136,9 +149,10 @@ def main():
):
print(f"Skipping {live_table_spec} due to --only argument")
continue
run_deduplication_query(
client, live_table, stable_table, args.date, args.dry_run
)
job_args.append([client, live_table, stable_table, args.date, args.dry_run])
with ThreadPool(args.parallelism) as p:
p.map(worker_entrypoint, job_args, chunksize=1)
if __name__ == "__main__":