From f8fa9ef3a4731231ecde4eb22ac2b1b6b79b0485 Mon Sep 17 00:00:00 2001 From: Anna Scholtz Date: Mon, 28 Oct 2024 12:02:45 -0700 Subject: [PATCH] Speed up view deploys by using processing pool (#6401) --- bigquery_etl/cli/view.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/bigquery_etl/cli/view.py b/bigquery_etl/cli/view.py index 7ee7d90b56..fd5b39f431 100644 --- a/bigquery_etl/cli/view.py +++ b/bigquery_etl/cli/view.py @@ -5,6 +5,7 @@ import re import string import sys from fnmatch import fnmatchcase +from functools import partial from graphlib import TopologicalSorter from multiprocessing.pool import Pool, ThreadPool from traceback import print_exc @@ -207,9 +208,11 @@ def publish( for view in views: view.labels["managed"] = "" if not force: + has_changes = partial(_view_has_changes, target_project) + # only views with changes - with ThreadPool(parallelism) as p: - changes = p.map(lambda v: v.has_changes(target_project), views, chunksize=1) + with Pool(parallelism) as p: + changes = p.map(has_changes, views) views = [v for v, has_changes in zip(views, changes) if has_changes] views_by_id = {v.view_identifier: v for v in views} @@ -220,9 +223,10 @@ def publish( for view in views } - client = bigquery.Client() view_id_order = TopologicalSorter(view_id_graph).static_order() + client = bigquery.Client() + result = [] for view_id in view_id_order: try: @@ -238,6 +242,10 @@ def publish( click.echo("All have been published.") +def _view_has_changes(target_project, view): + return view.has_changes(target_project) + + def _collect_views(name, sql_dir, project_id, user_facing_only, skip_authorized): view_files = paths_matching_name_pattern( name, sql_dir, project_id, files=("view.sql",)