Refactor publish_udfs script
This commit is contained in:
Родитель
2b29d24f59
Коммит
0080ff8867
|
@ -10,14 +10,15 @@ from google.cloud import storage
|
|||
from bigquery_etl.util import standard_args
|
||||
from bigquery_etl.parse_udf import read_udf_dirs, accumulate_dependencies
|
||||
|
||||
DEFAULT_PROJECT_ID = "mozfun"
|
||||
DEFAULT_UDF_DIR = "mozfun/"
|
||||
DEFAULT_DEPENDENCY_DIR = "lib/"
|
||||
DEFAULT_GCS_BUCKET = "mozfun"
|
||||
DEFAULT_PROJECT_ID = "moz-fx-data-shared-prod"
|
||||
DEFAULT_UDF_DIR = ["udf/", "udf_js/"]
|
||||
DEFAULT_DEPENDENCY_DIR = "udf_js/lib/"
|
||||
DEFAULT_GCS_BUCKET = "moz-fx-data-prod-bigquery-etl"
|
||||
DEFAULT_GCS_PATH = ""
|
||||
|
||||
OPTIONS_LIB_RE = re.compile(r'library = "gs://[^"]+/([^"]+)"')
|
||||
|
||||
SKIP = {"udf/main_summary_scalars/udf.sql"}
|
||||
|
||||
parser = ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
|
@ -27,8 +28,9 @@ parser.add_argument(
|
|||
help="Project to publish UDFs to",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--udf-dir",
|
||||
"--udf_dir",
|
||||
"--udf-dirs",
|
||||
"--udf_dirs",
|
||||
nargs="+",
|
||||
default=DEFAULT_UDF_DIR,
|
||||
help="Directory containing UDF definitions",
|
||||
)
|
||||
|
@ -50,6 +52,9 @@ parser.add_argument(
|
|||
default=DEFAULT_GCS_PATH,
|
||||
help="The GCS path in the bucket where dependency files are uploaded to.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--public", default=False, help="The published UDFs should be publicly accessible.",
|
||||
)
|
||||
standard_args.add_log_level(parser)
|
||||
|
||||
|
||||
|
@ -59,12 +64,12 @@ def main():
|
|||
|
||||
client = bigquery.Client(args.project_id)
|
||||
|
||||
if args.dependency_dir:
|
||||
if args.dependency_dir and os.path.exists(args.dependency_dir):
|
||||
push_dependencies_to_gcs(
|
||||
args.gcs_bucket, args.gcs_path, args.dependency_dir, args.project_id
|
||||
)
|
||||
|
||||
raw_udfs = read_udf_dirs(args.udf_dir)
|
||||
raw_udfs = read_udf_dirs(*args.udf_dir)
|
||||
|
||||
published_udfs = []
|
||||
|
||||
|
@ -74,7 +79,7 @@ def main():
|
|||
udfs_to_publish.append(raw_udf)
|
||||
|
||||
for dep in udfs_to_publish:
|
||||
if dep not in published_udfs:
|
||||
if dep not in published_udfs and raw_udfs[dep].filepath not in SKIP:
|
||||
publish_udf(
|
||||
raw_udfs[dep],
|
||||
client,
|
||||
|
@ -82,21 +87,25 @@ def main():
|
|||
args.gcs_bucket,
|
||||
args.gcs_path,
|
||||
raw_udfs.keys(),
|
||||
args.public,
|
||||
)
|
||||
published_udfs.append(dep)
|
||||
|
||||
|
||||
def publish_udf(raw_udf, client, project_id, gcs_bucket, gcs_path, known_udfs):
|
||||
def publish_udf(
|
||||
raw_udf, client, project_id, gcs_bucket, gcs_path, known_udfs, is_public
|
||||
):
|
||||
"""Publish a specific UDF to BigQuery."""
|
||||
# create new dataset for UDF if necessary
|
||||
dataset = client.create_dataset(raw_udf.dataset, exists_ok=True)
|
||||
if is_public:
|
||||
# create new dataset for UDF if necessary
|
||||
dataset = client.create_dataset(raw_udf.dataset, exists_ok=True)
|
||||
|
||||
# set permissions for dataset, public for everyone
|
||||
entry = bigquery.AccessEntry("READER", "specialGroup", "allAuthenticatedUsers")
|
||||
entries = list(dataset.access_entries)
|
||||
entries.append(entry)
|
||||
dataset.access_entries = entries
|
||||
dataset = client.update_dataset(dataset, ["access_entries"])
|
||||
# set permissions for dataset, public for everyone
|
||||
entry = bigquery.AccessEntry("READER", "specialGroup", "allAuthenticatedUsers")
|
||||
entries = list(dataset.access_entries)
|
||||
entries.append(entry)
|
||||
dataset.access_entries = entries
|
||||
dataset = client.update_dataset(dataset, ["access_entries"])
|
||||
|
||||
# transforms temporary UDF to persistent UDFs and publishes them
|
||||
for definition in raw_udf.definitions:
|
|
@ -1,119 +1,7 @@
|
|||
#!/usr/bin/env python3
|
||||
#!/bin/sh
|
||||
|
||||
"""
|
||||
This script publishes all user-defined functions in a directory as persistent
|
||||
UDFs in the {dirname} dataset.
|
||||
# Publish UDFs.
|
||||
|
||||
The {dirname}_ prefix will be stripped from names of published UDFs.
|
||||
"""
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
from argparse import ArgumentParser
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
|
||||
from google.cloud import bigquery
|
||||
from gcloud import storage
|
||||
|
||||
# sys.path needs to be modified to enable package imports from parent
|
||||
# and sibling directories. Also see:
|
||||
# https://stackoverflow.com/questions/6323860/sibling-package-imports/23542795#23542795
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from bigquery_etl.parse_udf import read_udf_dirs, accumulate_dependencies # noqa E402
|
||||
|
||||
|
||||
UDF_RE = re.compile(r"(udf_js|udf|legacy)(?:\.|_)([a-zA-z0-9_]+)")
|
||||
|
||||
OPTIONS_LIB_RE = re.compile(r'library = "gs://[^"]+/([^"]+)"')
|
||||
|
||||
SKIP = {"udf/main_summary_scalars.sql"}
|
||||
|
||||
parser = ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
"--project-id", default="moz-fx-data-derived-datasets", help="The project ID."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--udf-dirs",
|
||||
nargs="+",
|
||||
default=["udf/", "udf_js/"],
|
||||
help=(
|
||||
"The directories where declarations of UDFs are stored."
|
||||
" The names of these directories serve also to specify the target datasets"
|
||||
" the UDFs get published in."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--gcs-bucket",
|
||||
default="moz-fx-data-prod-bigquery-etl",
|
||||
help="The GCS bucket where dependency files are uploaded to.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--gcs-path",
|
||||
default="",
|
||||
help="The GCS path in the bucket where dependency files are uploaded to.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dep-dir",
|
||||
default="udf_js/lib/",
|
||||
help="The directory JavaScript dependency files for UDFs are stored.",
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
args = parser.parse_args()
|
||||
|
||||
raw_udfs = read_udf_dirs(*args.udf_dirs)
|
||||
published_udfs = []
|
||||
client = bigquery.Client(args.project_id)
|
||||
|
||||
if args.dep_dir:
|
||||
push_dependencies_to_gcs(
|
||||
args.gcs_bucket, args.gcs_path, args.dep_dir, args.project_id
|
||||
)
|
||||
|
||||
for raw_udf in raw_udfs:
|
||||
# get all dependencies for UDF and publish as persistent UDF
|
||||
udfs_to_publish = accumulate_dependencies([], raw_udfs, raw_udf)
|
||||
|
||||
udfs_to_publish.append(raw_udf)
|
||||
for dep in udfs_to_publish:
|
||||
if dep not in published_udfs and raw_udfs[dep].filepath not in SKIP:
|
||||
publish_persistent_udf(
|
||||
raw_udfs[dep],
|
||||
client,
|
||||
args.project_id,
|
||||
args.gcs_bucket,
|
||||
args.gcs_path,
|
||||
)
|
||||
published_udfs.append(dep)
|
||||
|
||||
|
||||
def publish_persistent_udf(raw_udf, client, project_id, gcs_bucket, gcs_path):
|
||||
# transforms temporary UDF to persistent UDFs and publishes them
|
||||
for definition in raw_udf.definitions:
|
||||
# Within a standard SQL function, references to other entities require
|
||||
# explicit project IDs
|
||||
query_with_renamed_udfs = UDF_RE.sub(
|
||||
"`" + project_id + "`." + r"\1" + "." + r"\2", definition
|
||||
)
|
||||
|
||||
# adjust paths for dependencies stored in GCS
|
||||
query = OPTIONS_LIB_RE.sub(
|
||||
fr'library = "gs://{gcs_bucket}/{gcs_path}\1"', query_with_renamed_udfs
|
||||
)
|
||||
|
||||
client.query(query).result()
|
||||
|
||||
|
||||
def push_dependencies_to_gcs(bucket, path, dep_dir, project_id):
|
||||
client = storage.Client(project_id)
|
||||
bucket = client.get_bucket(bucket)
|
||||
|
||||
for root, dirs, files in os.walk(dep_dir):
|
||||
for filename in files:
|
||||
blob = bucket.blob(path + filename)
|
||||
blob.upload_from_filename(os.path.join(root, filename))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
exec python3 -m "$@"
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
#!/bin/sh
|
||||
|
||||
# Publish UDFs to the public mozfun project.
|
||||
# TODO: consolidate public UDFs and UDFs used internally.
|
||||
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
exec python3 -m bigquery_etl.udf.publish_public_udfs "$@"
|
||||
exec python3 -m bigquery_etl.udf.publish_udfs --project_id=mozfun --udf-dirs=mozfun \
|
||||
--dependency_dir=lib/ --gcs-bucket=mozfun --public=True "$@"
|
||||
|
|
Загрузка…
Ссылка в новой задаче