Add parsed start and end times to views on top of Glean schemas

Closes https://github.com/mozilla/gcp-ingestion/issues/633
This commit is contained in:
Jeff Klukas 2020-01-03 12:00:09 -05:00
Родитель b90f4cd1b1
Коммит be92fd1a4e
2 изменённых файлов: 53 добавлений и 5 удалений

Просмотреть файл

@ -28,7 +28,7 @@ DEFAULT_EXCLUDE = r"*_raw"
VIEW_QUERY_TEMPLATE = """
SELECT
* REPLACE(
`moz-fx-data-shared-prod.udf.normalize_metadata`(metadata) AS metadata)
{replacements})
FROM
`{target}`
"""
@ -72,7 +72,7 @@ def main():
client = bigquery.Client()
views = get_views(client, args.patterns)
create_views_if_not_exist(views, args.exclude, args.sql_dir)
create_views_if_not_exist(client, views, args.exclude, args.sql_dir)
def get_views(client, patterns):
@ -118,7 +118,7 @@ def get_views(client, patterns):
return views
def create_views_if_not_exist(views, exclude, sql_dir):
def create_views_if_not_exist(client, views, exclude, sql_dir):
# create views unless a local file for creating the view exists
for view, tables in views.items():
if any(fnmatchcase(pattern, view) for pattern in exclude):
@ -140,13 +140,29 @@ def create_views_if_not_exist(views, exclude, sql_dir):
project, dataset, viewname = view.split(".")
target = f"{view}_v{version}"
view_query = VIEW_QUERY_TEMPLATE.format(target=target).strip()
view_dataset = dataset.rsplit("_", 1)[0]
full_view_id = ".".join([project, view_dataset, viewname])
target_file = os.path.join(sql_dir, view_dataset, viewname, "view.sql")
full_sql = f"CREATE OR REPLACE VIEW\n `{full_view_id}`\nAS {view_query}\n"
if not os.path.exists(target_file):
# We put this BQ API all inside the conditional to speed up execution
# in the case target files already exist.
table = client.get_table(target)
replacements = [
"`moz-fx-data-shared-prod.udf.normalize_metadata`(metadata)"
" AS metadata"
]
if "ping_info" in (f.name for f in table.schema):
# If we have ping_info, this must be a Glean schema.
replacements += [
"`moz-fx-data-shared-prod.udf.normalize_glean_ping_info`(ping_info)"
" AS ping_info"
]
replacements = ",\n ".join(replacements)
view_query = VIEW_QUERY_TEMPLATE.format(
target=target, replacements=replacements
).strip()
full_sql = f"CREATE OR REPLACE VIEW\n `{full_view_id}`\nAS {view_query}\n"
print("Creating " + target_file)
if not os.path.exists(os.path.dirname(target_file)):
os.makedirs(os.path.dirname(target_file))

Просмотреть файл

@ -0,0 +1,32 @@
/*
Accepts a glean ping_info struct as input and returns a modified struct that
includes a few parsed or normalized variants of the input fields.
*/
CREATE TEMP FUNCTION
udf_normalize_glean_ping_info(ping_info ANY TYPE) AS ((
SELECT
AS STRUCT
ping_info.*,
SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time) AS parsed_start_time,
SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.end_time) AS parsed_end_time));
-- Tests
SELECT
assert_equals(
TIMESTAMP '2019-12-01 09:22:00',
udf_normalize_glean_ping_info(
STRUCT('2019-12-01T20:22+11:00' AS start_time,
'2019-12-01T21:24+11:00' AS end_time)).parsed_start_time),
assert_equals(
TIMESTAMP '2019-12-01 10:24:00',
udf_normalize_glean_ping_info(
STRUCT('2019-12-01T20:22+11:00' AS start_time,
'2019-12-01T21:24+11:00' AS end_time)).parsed_end_time),
assert_null(
udf_normalize_glean_ping_info(
STRUCT('2019-12-01T20:22+11:00' AS start_time,
'2019-12-01T21:24:00+11:00' AS end_time)).parsed_end_time);