Support many billing projects and dates in copy_deduplicate (#426)

* Support many billing projects and dates in copy_deduplicate

* fix docs for --project_id

* explain default --billing_projects behavior

* Fix return value bug
This commit is contained in:
Daniel Thorn 2019-10-21 11:16:24 -07:00 коммит произвёл GitHub
Родитель dae3f709e4
Коммит 0f433f6a91
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 77 добавлений и 37 удалений

Просмотреть файл

@ -11,9 +11,11 @@ or to process only a specific list of tables.
"""
from argparse import ArgumentParser
from contextlib import contextmanager
from datetime import datetime, timedelta
from itertools import groupby
from multiprocessing.pool import ThreadPool
from queue import Queue
from uuid import uuid4
import fnmatch
@ -78,13 +80,15 @@ parser.add_argument(
"--project_id",
"--project-id",
default="moz-fx-data-shar-nonprod-efed",
help="ID of the project in which to run query jobs",
help="ID of the project in which to find tables",
)
parser.add_argument(
"--dates",
"--date",
nargs="+",
required=True,
type=lambda d: datetime.strptime(d, "%Y-%m-%d").date(),
help="Which day's data to copy, in format 2019-01-01",
help="One or more days of data to copy, in format 2019-01-01",
)
parser.add_argument(
"--parallelism",
@ -117,7 +121,7 @@ parser.add_argument(
default=1,
help=(
"Number of queries to split deduplicate over, each handling an equal-size time "
"slice of the date; avoids memory overflow at the cost of less effective"
"slice of the date; avoids memory overflow at the cost of less effective "
"clustering; recommended only for tables failing due to memory overflow"
),
)
@ -135,6 +139,19 @@ parser.add_argument(
default=0,
help="Number of days preceding --date that should be used to filter out duplicates",
)
parser.add_argument(
"--billing_projects",
"--billing-projects",
"--billing_project",
"--billing-project",
nargs="+",
# default to one client, with project=None for the google cloud sdk default project
default=[None],
help=(
"One or more billing projects over which bigquery jobs should be distributed; "
"if not specified use the google cloud sdk default project"
),
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--only",
@ -224,7 +241,6 @@ def get_query_job_configs(
] # explicitly use end_time to avoid rounding errors
return [
(
client,
sql,
stable_table,
bigquery.QueryJobConfig(
@ -258,7 +274,6 @@ def get_query_job_configs(
else:
return [
(
client,
sql,
stable_table,
bigquery.QueryJobConfig(
@ -358,47 +373,72 @@ def list_live_tables(client, pool, project_id, except_tables, only_tables):
return [f"{project_id}.{t}" for t in live_tables]
class ClientQueue:
"""Queue for balancing jobs across billing projects."""
def __init__(self, billing_projects, parallelism):
clients = [bigquery.Client(project) for project in billing_projects]
self._q = Queue(parallelism)
for i in range(parallelism):
self._q.put(clients[i % len(clients)])
@contextmanager
def client(self):
client = self._q.get_nowait()
try:
yield client
finally:
self._q.put_nowait(client)
def with_client(self, func, *args):
with self.client() as client:
return func(client, *args)
def main():
args = parser.parse_args()
client = bigquery.Client()
# create a queue for balancing load across projects
client_q = ClientQueue(args.billing_projects, args.parallelism)
with ThreadPool(args.parallelism) as pool:
live_tables = list_live_tables(
client=client,
pool=pool,
project_id=args.project_id,
except_tables=args.except_tables,
only_tables=args.only_tables,
)
job_args = [
args
for jobs in pool.starmap(
get_query_job_configs,
[
(
client,
live_table,
args.date,
args.dry_run,
args.slices,
args.priority,
args.preceding_days,
)
for live_table in live_tables
],
with client_q.client() as client:
live_tables = list_live_tables(
client=client,
pool=pool,
project_id=args.project_id,
except_tables=args.except_tables,
only_tables=args.only_tables,
)
for args in jobs
]
# preserve job_args order so results stay sorted by stable_table for groupby
results = pool.starmap(run_deduplication_query, job_args, chunksize=1)
copy_args = [
(client, stable_table, [query_job for _, query_job in group])
query_jobs = [
(run_deduplication_query, *args)
for jobs in pool.starmap(
get_query_job_configs,
[
(
client, # only use one client to create temp tables
live_table,
date,
args.dry_run,
args.slices,
args.priority,
args.preceding_days,
)
for live_table in live_tables
for date in args.dates
],
)
for args in jobs
]
# preserve query_jobs order so results stay sorted by stable_table for groupby
results = pool.starmap(client_q.with_client, query_jobs, chunksize=1)
copy_jobs = [
(copy_join_parts, stable_table, [query_job for _, query_job in group])
for stable_table, group in groupby(results, key=lambda result: result[0])
]
pool.starmap(copy_join_parts, copy_args)
pool.starmap(client_q.with_client, copy_jobs, chunksize=1)
if __name__ == "__main__":