Pass ID token to dryrun instances to speed things up (#6019)

* Pass ID token to dryrun instances to speed things up

* Parallelize metadata and dependency generation

* Use table schema from dryrun function
This commit is contained in:
Anna Scholtz 2024-08-08 12:38:43 -07:00 коммит произвёл GitHub
Родитель d45b6f40cc
Коммит 7024b6976e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
21 изменённых файлов: 294 добавлений и 138 удалений

Просмотреть файл

@ -310,7 +310,7 @@ jobs:
name: PyTest Integration Test
# yamllint disable rule:line-length
command: |
PATH="venv/bin:$PATH" script/entrypoint -m 'integration' -n 8
PATH="venv/bin:$PATH" script/entrypoint -m 'integration'
# workaround for job failing with `Too long with no output (exceeded 10m0s): context deadline exceeded` error
no_output_timeout: 30m
- unless:

Просмотреть файл

@ -10,11 +10,10 @@ from multiprocessing.pool import Pool
from typing import List, Set, Tuple
import rich_click as click
from google.cloud import bigquery
from ..cli.utils import is_authenticated
from ..config import ConfigLoader
from ..dryrun import DryRun
from ..dryrun import DryRun, get_credentials, get_id_token
@click.command(
@ -108,8 +107,16 @@ def dryrun(
)
sys.exit(1)
credentials = get_credentials()
id_token = get_id_token(credentials=credentials)
sql_file_valid = partial(
_sql_file_valid, use_cloud_function, project, respect_skip, validate_schemas
_sql_file_valid,
use_cloud_function,
respect_skip,
validate_schemas,
credentials=credentials,
id_token=id_token,
)
with Pool(8) as p:
@ -126,19 +133,15 @@ def dryrun(
def _sql_file_valid(
use_cloud_function, project, respect_skip, validate_schemas, sqlfile
use_cloud_function, respect_skip, validate_schemas, sqlfile, credentials, id_token
) -> Tuple[bool, str]:
if not use_cloud_function:
client = bigquery.Client(project=project)
else:
client = None
"""Dry run the SQL file."""
result = DryRun(
sqlfile,
use_cloud_function=use_cloud_function,
client=client,
credentials=credentials,
respect_skip=respect_skip,
id_token=id_token,
)
if validate_schemas:
try:

Просмотреть файл

@ -2,6 +2,7 @@
import rich_click as click
from bigquery_etl.cli.utils import parallelism_option
from bigquery_etl.format_sql.format import format as format_sql
@ -33,6 +34,7 @@ from bigquery_etl.format_sql.format import format as format_sql
" return code 0 indicates nothing would change;"
" return code 1 indicates some files would be reformatted",
)
def format(paths, check):
@parallelism_option()
def format(paths, check, parallelism):
"""Apply formatting to SQL files."""
format_sql(paths, check=check)
format_sql(paths, check=check, parallelism=parallelism)

Просмотреть файл

@ -47,7 +47,7 @@ from ..cli.utils import (
from ..config import ConfigLoader
from ..dependency import get_dependency_graph
from ..deploy import FailedDeployException, SkippedDeployException, deploy_table
from ..dryrun import DryRun
from ..dryrun import DryRun, get_credentials, get_id_token
from ..format_sql.format import skip_format
from ..format_sql.formatter import reformat
from ..metadata import validate_metadata
@ -1773,6 +1773,9 @@ def update(
except FileNotFoundError:
query_file_graph[query_file] = []
credentials = get_credentials()
id_token = get_id_token(credentials=credentials)
ts = ParallelTopologicalSorter(
query_file_graph, parallelism=parallelism, with_follow_up=update_downstream
)
@ -1788,6 +1791,8 @@ def update(
respect_dryrun_skip,
update_downstream,
is_init=is_init,
credentials=credentials,
id_token=id_token,
)
)
@ -1810,6 +1815,8 @@ def _update_query_schema_with_downstream(
query_file=None,
follow_up_queue=None,
is_init=False,
credentials=None,
id_token=None,
):
try:
changed = _update_query_schema(
@ -1821,6 +1828,8 @@ def _update_query_schema_with_downstream(
use_cloud_function,
respect_dryrun_skip,
is_init,
credentials,
id_token,
)
if update_downstream:
@ -1871,6 +1880,8 @@ def _update_query_schema(
use_cloud_function=True,
respect_dryrun_skip=True,
is_init=False,
credentials=None,
id_token=None,
):
"""
Update the schema of a specific query file.
@ -1965,6 +1976,8 @@ def _update_query_schema(
use_cloud_function=use_cloud_function,
respect_skip=respect_dryrun_skip,
sql_dir=sql_dir,
credentials=credentials,
id_token=id_token,
)
except Exception:
if not existing_schema_path.exists():
@ -1980,7 +1993,7 @@ def _update_query_schema(
# update bigquery metadata
try:
client = bigquery.Client()
client = bigquery.Client(credentials=credentials)
table = client.get_table(f"{project_name}.{dataset_name}.{table_name}")
metadata_file_path = query_file_path.parent / METADATA_FILE
@ -2026,22 +2039,14 @@ def _update_query_schema(
)
return
partitioned_by = None
try:
metadata = Metadata.of_query_file(query_file_path)
if metadata.bigquery and metadata.bigquery.time_partitioning:
partitioned_by = metadata.bigquery.time_partitioning.field
except FileNotFoundError:
pass
table_schema = Schema.for_table(
project_name,
dataset_name,
table_name,
partitioned_by,
use_cloud_function=use_cloud_function,
respect_skip=respect_dryrun_skip,
credentials=credentials,
id_token=id_token,
)
changed = True
@ -2153,6 +2158,9 @@ def deploy(
if not query_files:
raise click.ClickException(f"No queries matching `{name}` were found.")
credentials = get_credentials()
id_token = get_id_token(credentials=credentials)
_deploy = partial(
deploy_table,
destination_table=destination_table,
@ -2161,6 +2169,8 @@ def deploy(
skip_existing=skip_existing,
respect_dryrun_skip=respect_dryrun_skip,
sql_dir=sql_dir,
credentials=credentials,
id_token=id_token,
)
failed_deploys, skipped_deploys = [], []
@ -2184,7 +2194,7 @@ def deploy(
if not skip_external_data:
failed_external_deploys = _deploy_external_data(
name, sql_dir, project_id, skip_existing
name, sql_dir, project_id, skip_existing, credentials=credentials
)
failed_deploys += failed_external_deploys
@ -2201,17 +2211,14 @@ def deploy(
def _deploy_external_data(
name,
sql_dir,
project_id,
skip_existing,
name, sql_dir, project_id, skip_existing, credentials
) -> list:
"""Publish external data tables."""
# whether a table should be created from external data is defined in the metadata
metadata_files = paths_matching_name_pattern(
name, sql_dir, project_id, ["metadata.yaml"]
)
client = bigquery.Client()
client = bigquery.Client(credentials=credentials)
failed_deploys = []
for metadata_file_path in metadata_files:
metadata = Metadata.from_file(metadata_file_path)
@ -2285,7 +2292,11 @@ def _deploy_external_data(
def _validate_schema_from_path(
query_file_path, use_cloud_function=True, respect_dryrun_skip=True
query_file_path,
use_cloud_function=True,
respect_dryrun_skip=True,
credentials=None,
id_token=None,
):
"""Dry Runs and validates a query schema from its path."""
return (
@ -2293,6 +2304,8 @@ def _validate_schema_from_path(
query_file_path,
use_cloud_function=use_cloud_function,
respect_skip=respect_dryrun_skip,
credentials=credentials,
id_token=id_token,
).validate_schema(),
query_file_path,
)
@ -2335,10 +2348,15 @@ def validate_schema(
if query_files == []:
raise click.ClickException(f"No queries matching `{name}` were found.")
credentials = get_credentials()
id_token = get_id_token(credentials=credentials)
_validate_schema = partial(
_validate_schema_from_path,
use_cloud_function=use_cloud_function,
respect_dryrun_skip=respect_dryrun_skip,
credentials=credentials,
id_token=id_token,
)
with Pool(8) as p:

Просмотреть файл

@ -15,7 +15,7 @@ from ..cli.query import update as update_query_schema
from ..cli.routine import publish as publish_routine
from ..cli.utils import paths_matching_name_pattern, sql_dir_option
from ..cli.view import publish as publish_view
from ..dryrun import DryRun
from ..dryrun import DryRun, get_id_token
from ..routine.parse_routine import (
ROUTINE_FILES,
UDF_FILE,
@ -261,13 +261,14 @@ def _view_dependencies(artifact_files, sql_dir):
"""Determine view dependencies."""
view_dependencies = set()
view_dependency_files = [file for file in artifact_files if file.name == VIEW_FILE]
id_token = get_id_token()
for dep_file in view_dependency_files:
# all references views and tables need to be deployed in the same stage project
if dep_file not in artifact_files:
view_dependencies.add(dep_file)
if dep_file.name == VIEW_FILE:
view = View.from_file(dep_file)
view = View.from_file(dep_file, id_token=id_token)
for dependency in view.table_references:
dependency_components = dependency.split(".")
@ -302,12 +303,11 @@ def _view_dependencies(artifact_files, sql_dir):
path.mkdir(parents=True, exist_ok=True)
# don't create schema for wildcard and metadata tables
if "*" not in name and name != "INFORMATION_SCHEMA":
partitioned_by = "submission_timestamp"
schema = Schema.for_table(
project=project,
dataset=dataset,
table=name,
partitioned_by=partitioned_by,
id_token=id_token,
)
schema.to_yaml_file(path / SCHEMA_FILE)

Просмотреть файл

@ -10,6 +10,7 @@ from multiprocessing.pool import Pool, ThreadPool
from traceback import print_exc
import rich_click as click
from google.cloud import bigquery
from ..cli.utils import (
parallelism_option,
@ -19,7 +20,7 @@ from ..cli.utils import (
sql_dir_option,
)
from ..config import ConfigLoader
from ..dryrun import DryRun
from ..dryrun import DryRun, get_id_token
from ..metadata.parse_metadata import METADATA_FILE, Metadata
from ..util.bigquery_id import sql_table_id
from ..util.client_queue import ClientQueue
@ -102,7 +103,8 @@ def validate(
view_files = paths_matching_name_pattern(
name, sql_dir, project_id, files=("view.sql",)
)
views = [View.from_file(f) for f in view_files]
id_token = get_id_token()
views = [View.from_file(f, id_token=id_token) for f in view_files]
with Pool(parallelism) as p:
result = p.map(_view_is_valid, views)
@ -218,12 +220,13 @@ def publish(
for view in views
}
client = bigquery.Client()
view_id_order = TopologicalSorter(view_id_graph).static_order()
result = []
for view_id in view_id_order:
try:
result.append(views_by_id[view_id].publish(target_project, dry_run))
result.append(views_by_id[view_id].publish(target_project, dry_run, client))
except Exception:
print(f"Failed to publish view: {view_id}")
print_exc()
@ -239,8 +242,9 @@ def _collect_views(name, sql_dir, project_id, user_facing_only, skip_authorized)
view_files = paths_matching_name_pattern(
name, sql_dir, project_id, files=("view.sql",)
)
id_token = get_id_token()
views = [View.from_file(f) for f in view_files]
views = [View.from_file(f, id_token=id_token) for f in view_files]
if user_facing_only:
views = [v for v in views if v.is_user_facing]
if skip_authorized:

Просмотреть файл

@ -4,6 +4,7 @@ import re
import sys
from glob import glob
from itertools import groupby
from multiprocessing.pool import ThreadPool
from pathlib import Path
from subprocess import CalledProcessError
from typing import Dict, Iterator, List, Tuple
@ -223,16 +224,25 @@ def show(paths: Tuple[str, ...], without_views: bool):
is_flag=True,
help="Skip files with existing references rather than failing",
)
def record(paths: Tuple[str, ...], skip_existing):
@click.option(
"--parallelism",
"-p",
default=8,
type=int,
help="Number of threads for parallel processing",
)
def record(paths: Tuple[str, ...], skip_existing, parallelism):
"""Record table references in metadata."""
for parent, group in groupby(_get_references(paths), lambda e: e[0].parent):
def _record_dependencies(iter):
parent, group = iter
references = {
path.name: table_references
for path, table_references in group
if table_references
}
if not references:
continue
return
with open(parent / "metadata.yaml", "a+") as f:
f.seek(0)
metadata = yaml.safe_load(f)
@ -243,7 +253,12 @@ def record(paths: Tuple[str, ...], skip_existing):
elif "references" in metadata:
if skip_existing:
# Continue without modifying metadata.yaml
continue
return
raise click.ClickException(f"{f.name} already contains references")
f.write("\n# Generated by bigquery_etl.dependency\n")
f.write(yaml.dump({"references": references}))
with ThreadPool(parallelism) as pool:
pool.map(
_record_dependencies, groupby(_get_references(paths), lambda e: e[0].parent)
)

Просмотреть файл

@ -8,7 +8,7 @@ from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from .config import ConfigLoader
from .dryrun import DryRun
from .dryrun import DryRun, get_id_token
from .metadata.parse_metadata import Metadata
from .metadata.publish_metadata import attach_metadata
from .schema import SCHEMA_FILE, Schema
@ -33,6 +33,8 @@ def deploy_table(
update_metadata: bool = True,
respect_dryrun_skip: bool = True,
sql_dir=ConfigLoader.get("default", "sql_dir"),
credentials=None,
id_token=None,
) -> None:
"""Deploy a query to a destination."""
if respect_dryrun_skip and str(query_file) in DryRun.skipped_files():
@ -64,12 +66,15 @@ def deploy_table(
except Exception as e: # TODO: Raise/catch more specific exception
raise SkippedDeployException(f"Schema missing for {query_file}.") from e
client = bigquery.Client(credentials=credentials)
if not force and str(query_file).endswith("query.sql"):
query_schema = Schema.from_query_file(
query_file,
use_cloud_function=use_cloud_function,
respect_skip=respect_dryrun_skip,
sql_dir=sql_dir,
client=client,
id_token=id_token if not use_cloud_function or id_token else get_id_token(),
)
if not existing_schema.equal(query_schema):
raise FailedDeployException(
@ -80,7 +85,6 @@ def deploy_table(
f"{dataset_name}.{table_name}`",
)
client = bigquery.Client()
try:
table = client.get_table(destination_table)
except NotFound:

Просмотреть файл

@ -38,6 +38,31 @@ except ImportError:
from backports.cached_property import cached_property # type: ignore
def get_credentials(auth_req: Optional[GoogleAuthRequest] = None):
"""Get GCP credentials."""
auth_req = auth_req or GoogleAuthRequest()
credentials, _ = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
credentials.refresh(auth_req)
return credentials
def get_id_token(dry_run_url=ConfigLoader.get("dry_run", "function"), credentials=None):
"""Get token to authenticate against Cloud Function."""
auth_req = GoogleAuthRequest()
credentials = credentials or get_credentials(auth_req)
if hasattr(credentials, "id_token"):
# Get token from default credentials for the current environment created via Cloud SDK run
id_token = credentials.id_token
else:
# If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file,
# then ID token is acquired using this service account credentials.
id_token = fetch_id_token(auth_req, dry_run_url)
return id_token
class Errors(Enum):
"""DryRun errors that require special handling."""
@ -58,16 +83,30 @@ class DryRun:
client=None,
respect_skip=True,
sql_dir=ConfigLoader.get("default", "sql_dir"),
id_token=None,
credentials=None,
project=None,
dataset=None,
table=None,
):
"""Instantiate DryRun class."""
self.sqlfile = sqlfile
self.content = content
self.strip_dml = strip_dml
self.use_cloud_function = use_cloud_function
self.client = client if use_cloud_function or client else bigquery.Client()
self.bq_client = client
self.respect_skip = respect_skip
self.dry_run_url = ConfigLoader.get("dry_run", "function")
self.sql_dir = sql_dir
self.id_token = (
id_token
if not use_cloud_function or id_token
else get_id_token(self.dry_run_url)
)
self.credentials = credentials
self.project = project
self.dataset = dataset
self.table = table
try:
self.metadata = Metadata.of_query_file(self.sqlfile)
except FileNotFoundError:
@ -82,6 +121,13 @@ class DryRun:
)
sys.exit(1)
@cached_property
def client(self):
"""Get BigQuery client instance."""
if self.use_cloud_function:
return None
return self.bq_client or bigquery.Client(credentials=self.credentials)
@staticmethod
def skipped_files(sql_dir=ConfigLoader.get("default", "sql_dir")) -> Set[str]:
"""Return files skipped by dry run."""
@ -184,33 +230,23 @@ class DryRun:
dataset = basename(dirname(dirname(self.sqlfile)))
try:
if self.use_cloud_function:
auth_req = GoogleAuthRequest()
creds, _ = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
creds.refresh(auth_req)
if hasattr(creds, "id_token"):
# Get token from default credentials for the current environment created via Cloud SDK run
id_token = creds.id_token
else:
# If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file,
# then ID token is acquired using this service account credentials.
id_token = fetch_id_token(auth_req, self.dry_run_url)
json_data = {
"project": self.project or project,
"dataset": self.dataset or dataset,
"query": sql,
}
if self.table:
json_data["table"] = self.table
r = urlopen(
Request(
self.dry_run_url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {id_token}",
"Authorization": f"Bearer {self.id_token}",
},
data=json.dumps(
{
"project": project,
"dataset": dataset,
"query": sql,
}
).encode("utf8"),
data=json.dumps(json_data).encode("utf8"),
method="POST",
)
)
@ -240,6 +276,22 @@ class DryRun:
else:
raise e
if (
self.project is not None
and self.table is not None
and self.dataset is not None
):
table = self.client.get_table(
f"{self.project}.{self.dataset}.{self.table}"
)
table_metadata = {
"tableType": table.table_type,
"friendlyName": table.friendly_name,
"schema": {
"fields": [field.to_api_repr() for field in table.schema]
},
}
return {
"valid": True,
"referencedTables": [
@ -251,6 +303,7 @@ class DryRun:
.get("schema", {})
),
"datasetLabels": dataset_labels,
"tableMetadata": table_metadata,
}
except Exception as e:
print(f"{self.sqlfile!s:59} ERROR\n", e)
@ -294,7 +347,12 @@ class DryRun:
f"{self.get_sql()}WHERE {date_filter} > current_date()"
)
if (
DryRun(self.sqlfile, filtered_content).get_error()
DryRun(
self.sqlfile,
filtered_content,
client=self.client,
id_token=self.id_token,
).get_error()
== Errors.DATE_FILTER_NEEDED_AND_SYNTAX
):
# If the date filter (e.g. WHERE crash_date > current_date())
@ -310,14 +368,24 @@ class DryRun:
f"{self.get_sql()}WHERE {date_filter} > current_timestamp()"
)
if (
DryRun(sqlfile=self.sqlfile, content=filtered_content).get_error()
DryRun(
sqlfile=self.sqlfile,
content=filtered_content,
client=self.client,
id_token=self.id_token,
).get_error()
== Errors.DATE_FILTER_NEEDED_AND_SYNTAX
):
filtered_content = (
f"{self.get_sql()}AND {date_filter} > current_timestamp()"
)
stripped_dml_result = DryRun(sqlfile=self.sqlfile, content=filtered_content)
stripped_dml_result = DryRun(
sqlfile=self.sqlfile,
content=filtered_content,
client=self.client,
id_token=self.id_token,
)
if (
stripped_dml_result.get_error() is None
and "referencedTables" in stripped_dml_result.dry_run_result
@ -344,6 +412,24 @@ class DryRun:
return {}
def get_table_schema(self):
"""Return the schema of the provided table."""
if not self.skip() and not self.is_valid():
raise Exception(f"Error when dry running SQL file {self.sqlfile}")
if self.skip():
print(f"\t...Ignoring dryrun results for {self.sqlfile}")
return {}
if (
self.dry_run_result
and self.dry_run_result["valid"]
and "tableMetadata" in self.dry_run_result
):
return self.dry_run_result["tableMetadata"]["schema"]
return []
def get_dataset_labels(self):
"""Return the labels on the default dataset by dry running the SQL file."""
if not self.skip() and not self.is_valid():
@ -446,16 +532,12 @@ class DryRun:
dataset_name = query_file_path.parent.parent.name
project_name = query_file_path.parent.parent.parent.name
partitioned_by = None
if (
self.metadata
and self.metadata.bigquery
and self.metadata.bigquery.time_partitioning
):
partitioned_by = self.metadata.bigquery.time_partitioning.field
table_schema = Schema.for_table(
project_name, dataset_name, table_name, partitioned_by
project_name,
dataset_name,
table_name,
client=self.client,
id_token=self.id_token,
)
# This check relies on the new schema being deployed to prod

Просмотреть файл

@ -4,6 +4,8 @@ import glob
import os
import os.path
import sys
from functools import partial
from multiprocessing.pool import Pool
from pathlib import Path
from bigquery_etl.config import ConfigLoader
@ -31,7 +33,31 @@ def skip_qualifying_references():
]
def format(paths, check=False):
def _format_path(check, path):
query = Path(path).read_text()
try:
if not any([path.endswith(s) for s in skip_qualifying_references()]):
fully_referenced_query = qualify_table_references_in_file(Path(path))
else:
fully_referenced_query = query
except NotImplementedError:
fully_referenced_query = query # not implemented for scripts
formatted = reformat(fully_referenced_query, trailing_newline=True)
if query != formatted:
if check:
print(f"Needs reformatting: bqetl format {path}")
else:
with open(path, "w") as fp:
fp.write(formatted)
print(f"Reformatted: {path}")
return 1
else:
return 0
def format(paths, check=False, parallelism=8):
"""Format SQL files."""
if not paths:
query = sys.stdin.read()
@ -60,32 +86,12 @@ def format(paths, check=False):
if not sql_files:
print("Error: no files were found to format")
sys.exit(255)
sql_files.sort()
reformatted = unchanged = 0
for path in sql_files:
query = Path(path).read_text()
try:
if not any([path.endswith(s) for s in skip_qualifying_references()]):
fully_referenced_query = qualify_table_references_in_file(
Path(path)
)
else:
fully_referenced_query = query
except NotImplementedError:
fully_referenced_query = query # not implemented for scripts
with Pool(parallelism) as pool:
result = pool.map(partial(_format_path, check), sql_files)
formatted = reformat(fully_referenced_query, trailing_newline=True)
if query != formatted:
if check:
print(f"Needs reformatting: bqetl format {path}")
else:
with open(path, "w") as fp:
fp.write(formatted)
print(f"Reformatted: {path}")
reformatted += 1
else:
unchanged += 1
reformatted = sum(result)
unchanged = len(sql_files) - reformatted
print(
", ".join(
f"{number} file{'s' if number > 1 else ''}"

Просмотреть файл

@ -58,21 +58,19 @@ class Schema:
return cls(json_schema)
@classmethod
def for_table(cls, project, dataset, table, partitioned_by=None, *args, **kwargs):
def for_table(cls, project, dataset, table, *args, **kwargs):
"""Get the schema for a BigQuery table."""
query = f"SELECT * FROM `{project}.{dataset}.{table}`"
if partitioned_by:
query += f" WHERE DATE(`{partitioned_by}`) = DATE('2020-01-01')"
try:
return cls(
dryrun.DryRun(
os.path.join(project, dataset, table, "query.sql"),
query,
"SELECT 1", # placeholder query
project=project,
dataset=dataset,
table=table,
*args,
**kwargs,
).get_schema()
).get_table_schema()
)
except Exception as e:
print(f"Cannot get schema for {project}.{dataset}.{table}: {e}")

Просмотреть файл

@ -8,7 +8,7 @@ import time
from functools import cached_property
from pathlib import Path
from textwrap import dedent
from typing import Optional
from typing import Any, Optional
import attr
import sqlparse
@ -42,6 +42,7 @@ class View:
dataset: str = attr.ib()
project: str = attr.ib()
partition_column: Optional[str] = attr.ib(None)
id_token: Optional[Any] = attr.ib(None)
@path.validator
def validate_path(self, attribute, value):
@ -219,7 +220,9 @@ class View:
WHERE {schema_query_filter}
"""
)
return Schema.from_query_file(Path(self.path), content=schema_query)
return Schema.from_query_file(
Path(self.path), content=schema_query, id_token=self.id_token
)
except Exception as e:
print(f"Error dry-running view {self.view_identifier} to get schema: {e}")
return None
@ -349,7 +352,7 @@ class View:
return False
def publish(self, target_project=None, dry_run=False):
def publish(self, target_project=None, dry_run=False, client=None):
"""
Publish this view to BigQuery.
@ -365,7 +368,7 @@ class View:
any(str(self.path).endswith(p) for p in self.skip_validation())
or self._valid_view_naming()
):
client = bigquery.Client()
client = client or bigquery.Client()
sql = self.content
target_view = self.target_view_identifier(target_project)

Просмотреть файл

@ -7,6 +7,7 @@ import click
from pathos.multiprocessing import ProcessingPool
from bigquery_etl.cli.utils import use_cloud_function_option
from bigquery_etl.dryrun import get_id_token
NON_USER_FACING_DATASET_SUBSTRINGS = (
"_derived",
@ -21,7 +22,7 @@ METADATA_FILE = "metadata.yaml"
SCHEMA_FILE = "schema.yaml"
def _generate_view_schema(sql_dir, view_directory):
def _generate_view_schema(sql_dir, view_directory, id_token=None):
import logging
from bigquery_etl.dependency import extract_table_references
@ -110,7 +111,9 @@ def _generate_view_schema(sql_dir, view_directory):
if reference_partition_column is None:
logging.debug("No reference partition column, dry running without one.")
view = View.from_file(view_file, partition_column=reference_partition_column)
view = View.from_file(
view_file, partition_column=reference_partition_column, id_token=id_token
)
# `View.schema` prioritizes the configured schema over the dryrun schema, but here
# we prioritize the dryrun schema because the `schema.yaml` file might be out of date.
@ -197,8 +200,11 @@ def generate(target_project, output_dir, parallelism, use_cloud_function):
)
]
id_token = get_id_token()
view_directories = []
for dataset_path in dataset_paths:
view_directories = [
view_directories += [
path
for path in dataset_path.iterdir()
if path.is_dir() and (path / VIEW_FILE).exists()
@ -206,9 +212,6 @@ def generate(target_project, output_dir, parallelism, use_cloud_function):
with ProcessingPool(parallelism) as pool:
pool.map(
partial(
_generate_view_schema,
Path(output_dir),
),
partial(_generate_view_schema, Path(output_dir), id_token=id_token),
view_directories,
)

Просмотреть файл

@ -12,6 +12,7 @@ from bigquery_etl.cli.utils import (
use_cloud_function_option,
)
from bigquery_etl.config import ConfigLoader
from bigquery_etl.dryrun import get_id_token
from sql_generators.glean_usage import (
baseline_clients_daily,
baseline_clients_first_seen,
@ -135,6 +136,8 @@ def generate(
not in ConfigLoader.get("generate", "glean_usage", "skip_apps", fallback=[])
]
id_token=get_id_token()
# Prepare parameters so that generation of all Glean datasets can be done in parallel
# Parameters to generate per-app_id datasets consist of the function to be called
@ -148,6 +151,7 @@ def generate(
use_cloud_function=use_cloud_function,
app_info=app_info,
parallelism=parallelism,
id_token=id_token
),
baseline_table,
)
@ -165,6 +169,7 @@ def generate(
output_dir=output_dir,
use_cloud_function=use_cloud_function,
parallelism=parallelism,
id_token=id_token
),
info,
)

Просмотреть файл

@ -24,6 +24,7 @@ class BaselineClientsFirstSeenTable(GleanTable):
use_cloud_function=True,
app_info=[],
parallelism=8,
id_token=None,
):
"""Generate per-app_id datasets."""
self.custom_render_kwargs = dict(
@ -47,4 +48,5 @@ class BaselineClientsFirstSeenTable(GleanTable):
output_dir=output_dir,
app_info=app_info,
parallelism=parallelism,
id_token=id_token
)

Просмотреть файл

@ -130,9 +130,9 @@ def table_names_from_baseline(baseline_table, include_project_id=True):
)
def referenced_table_exists(view_sql):
def referenced_table_exists(view_sql, id_token=None):
"""Dry run the given view SQL to see if its referent exists."""
dryrun = DryRun("foo/bar/view.sql", content=view_sql)
dryrun = DryRun("foo/bar/view.sql", content=view_sql, id_token=id_token)
# 403 is returned if referenced dataset doesn't exist; we need to check that the 403 is due to dataset not existing
# since dryruns on views will also return 403 due to the table CREATE
# 404 is returned if referenced table or view doesn't exist
@ -210,6 +210,7 @@ class GleanTable:
use_cloud_function=True,
app_info=[],
parallelism=8,
id_token=None
):
"""Generate the baseline table query per app_id."""
if not self.per_app_id_enabled:
@ -295,7 +296,7 @@ class GleanTable:
Artifact(table, "query.sql", query_sql),
]
if not (referenced_table_exists(view_sql)):
if not (referenced_table_exists(view_sql, id_token)):
logging.info("Skipping view for table which doesn't exist:" f" {table}")
else:
artifacts.append(Artifact(view, "view.sql", view_sql))
@ -338,6 +339,7 @@ class GleanTable:
output_dir=None,
use_cloud_function=True,
parallelism=8,
id_token=None
):
"""Generate the baseline table query per app_name."""
if not self.per_app_enabled:
@ -383,7 +385,7 @@ class GleanTable:
)
view = f"{project_id}.{target_dataset}.{target_view_name}"
if not (referenced_table_exists(sql)):
if not (referenced_table_exists(sql, id_token=id_token)):
logging.info("Skipping view for table which doesn't exist:" f" {view}")
return
@ -417,7 +419,7 @@ class GleanTable:
table = f"{project_id}.{target_dataset}_derived.{self.target_table_id}"
view = f"{project_id}.{target_dataset}.{target_view_name}"
if not (referenced_table_exists(query_sql)):
if not (referenced_table_exists(query_sql, id_token=id_token)):
logging.info(
"Skipping query for table which doesn't exist:"
f" {self.target_table_id}"

Просмотреть файл

@ -73,6 +73,7 @@ class EventMonitoringLive(GleanTable):
use_cloud_function=True,
app_info=[],
parallelism=8,
id_token=None
):
tables = table_names_from_baseline(baseline_table, include_project_id=False)

Просмотреть файл

@ -32,6 +32,7 @@ class EventsStreamTable(GleanTable):
use_cloud_function=True,
app_info=[],
parallelism=8,
id_token=None,
):
# Get the app ID from the baseline_table name.
# This is what `common.py` also does.
@ -50,7 +51,7 @@ class EventsStreamTable(GleanTable):
self.custom_render_kwargs = {"metrics_as_struct": metrics_as_struct}
super().generate_per_app_id(
project_id, baseline_table, output_dir, use_cloud_function, app_info
project_id, baseline_table, output_dir, use_cloud_function, app_info, id_token=id_token
)
def generate_per_app(
@ -60,6 +61,7 @@ class EventsStreamTable(GleanTable):
output_dir=None,
use_cloud_function=True,
parallelism=8,
id_token=None,
):
"""Generate the events_stream table query per app_name."""
target_dataset = app_info[0]["app_name"]
@ -68,4 +70,4 @@ class EventsStreamTable(GleanTable):
):
return
super().generate_per_app(project_id, app_info, output_dir)
super().generate_per_app(project_id, app_info, output_dir, id_token=id_token)

Просмотреть файл

@ -29,8 +29,9 @@ class EventsUnnestedTable(GleanTable):
output_dir=None,
use_cloud_function=True,
parallelism=8,
id_token=None,
):
"""Generate the events_unnested table query per app_name."""
target_dataset = app_info[0]["app_name"]
if target_dataset not in DATASET_SKIP:
super().generate_per_app(project_id, app_info, output_dir)
super().generate_per_app(project_id, app_info, output_dir, id_token=id_token)

Просмотреть файл

@ -59,6 +59,7 @@ class GleanAppPingViews(GleanTable):
output_dir=None,
use_cloud_function=True,
parallelism=8,
id_token=None
):
"""
Generate per-app ping views across channels.
@ -109,8 +110,8 @@ class GleanAppPingViews(GleanTable):
"moz-fx-data-shared-prod",
channel_dataset,
view_name,
partitioned_by="submission_timestamp",
use_cloud_function=use_cloud_function,
id_token=id_token
)
cached_schemas[channel_dataset] = deepcopy(schema)

Просмотреть файл

@ -19,6 +19,7 @@ from pathos.multiprocessing import ProcessingPool
from bigquery_etl.cli.utils import use_cloud_function_option
from bigquery_etl.schema.stable_table_schema import SchemaFile, get_stable_table_schemas
from bigquery_etl.dryrun import get_id_token
VIEW_QUERY_TEMPLATE = """\
-- Generated via ./bqetl generate stable_views
@ -121,7 +122,7 @@ def write_dataset_metadata_if_not_exists(
).write(target)
def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaFile):
def write_view_if_not_exists(target_project: str, sql_dir: Path, id_token=None, schema: SchemaFile = None):
"""If a view.sql does not already exist, write one to the target directory."""
# add imports here to run in multiple processes via pathos
import re
@ -309,7 +310,7 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF
content = VIEW_CREATE_REGEX.sub("", target_file.read_text())
content += " WHERE DATE(submission_timestamp) = '2020-01-01'"
view_schema = Schema.from_query_file(
target_file, content=content, sql_dir=sql_dir
target_file, content=content, sql_dir=sql_dir, id_token=id_token
)
stable_table_schema = Schema.from_json({"fields": schema.schema})
@ -379,12 +380,15 @@ def generate(target_project, output_dir, log_level, parallelism, use_cloud_funct
last for k, (*_, last) in groupby(schemas, lambda t: t.bq_dataset_family)
]
id_token = get_id_token()
with ProcessingPool(parallelism) as pool:
pool.map(
partial(
write_view_if_not_exists,
target_project,
Path(output_dir),
id_token
),
schemas,
)