Add a generator for events stream tables (#4655)

* Add a generator for events stream tables

Open questions:

* How does init work?
  * Is this manually triggered? How do we backfill to a certain date?
* Schema is defined in SQL query. How does this behave on changes in
  the future?
* Configuration: Right now inline in Python. Should we change this?

TODO:

* check table Schema

* Store category and name separately to help with filtering and clustering

* Concat into full event name using array to avoid NULL issues

* events stream: Read allowed apps from project configuration

* event stream: Cluster by event category

* Remove trailing commas

Co-authored-by: Anna Scholtz <anna@scholtzan.net>

* Update sql_generators/glean_usage/events_stream.py

* Update sql_generators/glean_usage/templates/events_stream_v1.metadata.yaml

* Update sql_generators/glean_usage/templates/events_stream_v1.query.sql

* Update sql_generators/glean_usage/templates/events_stream_v1.query.sql

* Update sql_generators/glean_usage/templates/events_stream_v1.query.sql

* Update sql_generators/glean_usage/templates/events_stream_v1.query.sql

* Update sql_generators/glean_usage/templates/events_stream_v1.query.sql

* Update sql_generators/glean_usage/common.py

* Update sql_generators/glean_usage/events_stream.py

---------

Co-authored-by: Anna Scholtz <anna@scholtzan.net>
This commit is contained in:
Jan-Erik Rediger 2024-01-16 19:56:11 +01:00 коммит произвёл GitHub
Родитель c3fa65a30e
Коммит 1c7e4b35a4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
9 изменённых файлов: 190 добавлений и 0 удалений

Просмотреть файл

@ -422,3 +422,10 @@ generate:
- mlhackweek_search
- regrets_reporter
- regrets_reporter_ucs
events_stream:
datasets:
- fenix
app_ids:
- org_mozilla_firefox
- org_mozilla_fenix
- org_mozilla_fenix_beta

Просмотреть файл

@ -19,6 +19,7 @@ from sql_generators.glean_usage import (
event_error_monitoring,
event_flow_monitoring,
event_monitoring_live,
events_stream,
events_unnested,
glean_app_ping_views,
metrics_clients_daily,
@ -39,6 +40,7 @@ GLEAN_TABLES = [
event_monitoring_live.EventMonitoringLive(),
event_error_monitoring.EventErrorMonitoring(),
event_flow_monitoring.EventFlowMonitoring(),
events_stream.EventsStreamTable(),
]

Просмотреть файл

@ -120,6 +120,9 @@ def table_names_from_baseline(baseline_table, include_project_id=True):
last_seen_view=f"{prefix}.baseline_clients_last_seen",
first_seen_view=f"{prefix}.baseline_clients_first_seen",
event_monitoring=f"{prefix}_derived.event_monitoring_live_v1",
events_view=f"{prefix}.events",
events_stream_table=f"{prefix}_derived.events_stream_v1",
events_stream_view=f"{prefix}.events_stream",
)

Просмотреть файл

@ -0,0 +1,58 @@
"""Generate events stream queries for Glean apps."""
import re
from bigquery_etl.config import ConfigLoader
from sql_generators.glean_usage.common import GleanTable
TARGET_TABLE_ID = "events_stream_v1"
PREFIX = "events_stream"
class EventsStreamTable(GleanTable):
"""Represents generated events_stream table."""
def __init__(self):
"""Initialize events_stream table."""
GleanTable.__init__(self)
self.target_table_id = TARGET_TABLE_ID
self.prefix = PREFIX
self.no_init = True
self.per_app_enabled = True
self.per_app_id_enabled = True
self.across_apps_enabled = True
self.cross_channel_template = "cross_channel_events_stream.query.sql"
self.base_table_name = "events_v1"
def generate_per_app_id(
self,
project_id,
baseline_table,
output_dir=None,
use_cloud_function=True,
app_info=[],
):
# Get the app ID from the baseline_table name.
# This is what `common.py` also does.
app_id = re.sub(r"_stable\..+", "", baseline_table)
app_id = ".".join(app_id.split(".")[1:])
# Skip any not-allowed app.
if app_id not in ConfigLoader.get(
"generate", "glean_usage", "events_stream", "app_ids", fallback=[]
):
return
super().generate_per_app_id(
project_id, baseline_table, output_dir, use_cloud_function, app_info
)
def generate_per_app(
self, project_id, app_info, output_dir=None, use_cloud_function=True
):
"""Generate the events_stream table query per app_name."""
target_dataset = app_info[0]["app_name"]
if target_dataset in ConfigLoader.get(
"generate", "glean_usage", "events_stream", "datasets", fallback=[]
):
super().generate_per_app(project_id, app_info, output_dir)

Просмотреть файл

@ -0,0 +1,20 @@
-- Generated via ./bqetl generate glean_usage
CREATE OR REPLACE VIEW
`{{ project_id }}.{{ target_view }}`
AS
{% for (dataset, channel) in datasets %}
{% if not loop.first -%}
UNION ALL
{% endif -%}
SELECT
"{{ dataset }}" AS normalized_app_id,
e.*
REPLACE(
{% if app_name == "fenix" -%}
mozfun.norm.fenix_app_info("{{ dataset }}", client_info.app_build).channel AS normalized_channel
{% elif datasets|length > 1 -%}
"{{ channel }}" AS normalized_channel
{% endif -%}
),
FROM `{{ project_id }}.{{ dataset }}_derived.events_stream` AS e
{% endfor %}

Просмотреть файл

@ -0,0 +1,8 @@
{{ header_yaml }}
friendly_name: Events Stream
description: |-
An events stream table, with one row per event.
owners:
- jrediger@mozilla.com
- wstuckey@mozilla.com

Просмотреть файл

@ -0,0 +1,9 @@
{{ header }}
CREATE OR REPLACE VIEW
`{{ project_id }}.{{ events_stream_view }}`
AS
SELECT
*
FROM
`{{ project_id }}.{{ events_stream_table }}`

Просмотреть файл

@ -0,0 +1,25 @@
{{ header_yaml }}
friendly_name: Events Stream
description: |-
An events stream table, with one row per event.
owners:
- jrediger@mozilla.com
- wstuckey@mozilla.com
labels:
incremental: true
schedule: daily
scheduling:
dag_name: bqetl_glean_usage
task_group: {{ app_name }}
bigquery:
time_partitioning:
type: day
field: submission_date
require_partition_filter: true
clustering:
fields:
- normalized_channel
- sample_id
- event_category

Просмотреть файл

@ -0,0 +1,58 @@
{{ header }}
WITH base AS (
SELECT
* EXCEPT (metrics, events, name, category, extra, timestamp) REPLACE (
STRUCT(
client_info.app_build AS app_build,
client_info.app_channel AS app_channel,
client_info.app_display_version AS app_display_version,
client_info.architecture AS architecture,
client_info.device_manufacturer AS device_manufacturer,
client_info.device_model AS device_model,
client_info.first_run_date AS first_run_date,
client_info.locale AS locale,
client_info.os AS os,
client_info.os_version AS os_version,
client_info.telemetry_sdk_build AS telemetry_sdk_build,
client_info.build_date AS build_date
) AS client_info,
STRUCT(
ping_info.seq,
ping_info.start_time,
ping_info.parsed_start_time,
ping_info.end_time,
ping_info.parsed_end_time,
ping_info.ping_type
) AS ping_info
),
DATE(submission_timestamp) AS submission_date,
client_info.client_id AS client_id,
ping_info.reason AS reason,
`mozfun.json.from_map`(ping_info.experiments) AS experiments,
SAFE.TIMESTAMP_ADD(
ping_info.parsed_start_time,
INTERVAL event.timestamp MILLISECOND
) AS event_timestamp,
event.category as event_category,
event.name as event_name,
ARRAY_TO_STRING([event.category, event.name], ',') AS event, -- handles NULL values better
`mozfun.json.from_map`(event.extra) AS event_extra,
FROM
`{{ events_view }}` AS e
CROSS JOIN
UNNEST(events) AS event
WHERE
{% raw %}
{% if is_init() %}
DATE(submission_timestamp) >= '2023-11-01'
{% else %}
DATE(submission_timestamp) = @submission_date
{% endif %}
{% endraw %}
)
--
SELECT
*
FROM
base