* Event flow monitoring

* Script query for getting flow source target pairs

* Cross app script for event flow monitoring

* Add event flows

* Improve timestamp handling for events

* Add handling for accounts to event_flow_monitoring

* Handle null categories in event flow monitoring

* Limit number of events in event flow monitoring
This commit is contained in:
Anna Scholtz 2024-01-11 11:43:18 -08:00 коммит произвёл GitHub
Родитель 4c3459d2c3
Коммит 8dd0e09aa1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 387 добавлений и 17 удалений

Просмотреть файл

@ -419,7 +419,6 @@ generate:
skip_existing: # Skip automatically updating the following artifacts
- sql/moz-fx-data-shared-prod/fenix/client_deduplication/**
- sql/moz-fx-data-shared-prod/org_mozilla_tv_firefox_derived/baseline_clients_last_seen_v1/checks.sql
event_monitoring:
skip_apps:
- mlhackweek_search
- regrets_reporter

Просмотреть файл

@ -10,12 +10,14 @@ from bigquery_etl.cli.utils import (
table_matches_patterns,
use_cloud_function_option,
)
from bigquery_etl.config import ConfigLoader
from sql_generators.glean_usage import (
baseline_clients_daily,
baseline_clients_first_seen,
baseline_clients_last_seen,
clients_last_seen_joined,
event_error_monitoring,
event_flow_monitoring,
event_monitoring_live,
events_unnested,
glean_app_ping_views,
@ -36,14 +38,9 @@ GLEAN_TABLES = [
clients_last_seen_joined.ClientsLastSeenJoined(),
event_monitoring_live.EventMonitoringLive(),
event_error_monitoring.EventErrorMonitoring(),
event_flow_monitoring.EventFlowMonitoring(),
]
# * mlhackweek_search was an experiment that we don't want to generate tables
# for
# * regrets_reporter currently refers to two applications, skip the glean
# one to avoid confusion: https://github.com/mozilla/bigquery-etl/issues/2499
SKIP_APPS = ["mlhackweek_search", "regrets_reporter", "regrets_reporter_ucs"]
@click.command()
@click.option(
@ -113,7 +110,12 @@ def generate(
baseline_table
for baseline_table in baseline_tables
if baseline_table.split(".")[1]
not in [f"{skipped_app}_stable" for skipped_app in SKIP_APPS]
not in [
f"{skipped_app}_stable"
for skipped_app in ConfigLoader.get(
"generate", "glean_usage", "skip_apps", fallback=[]
)
]
]
output_dir = Path(output_dir) / target_project
@ -123,7 +125,12 @@ def generate(
if app_name:
app_info = {name: info for name, info in app_info.items() if name == app_name}
app_info = [info for name, info in app_info.items() if name not in SKIP_APPS]
app_info = [
info
for name, info in app_info.items()
if name
not in ConfigLoader.get("generate", "glean_usage", "skip_apps", fallback=[])
]
# Prepare parameters so that generation of all Glean datasets can be done in parallel

Просмотреть файл

@ -18,7 +18,12 @@ class BaselineClientsFirstSeenTable(GleanTable):
self.custom_render_kwargs = {}
def generate_per_app_id(
self, project_id, baseline_table, output_dir=None, use_cloud_function=True, app_info=[]
self,
project_id,
baseline_table,
output_dir=None,
use_cloud_function=True,
app_info=[],
):
"""Generate per-app_id datasets."""
self.custom_render_kwargs = dict(
@ -36,9 +41,5 @@ class BaselineClientsFirstSeenTable(GleanTable):
)
GleanTable.generate_per_app_id(
self,
project_id,
baseline_table,
output_dir=output_dir,
app_info=app_info
self, project_id, baseline_table, output_dir=output_dir, app_info=app_info
)

Просмотреть файл

@ -0,0 +1,94 @@
"""Generate Aggregate table for monitoring event flows."""
import os
from collections import namedtuple
from pathlib import Path
from bigquery_etl.schema.stable_table_schema import get_stable_table_schemas
from bigquery_etl.config import ConfigLoader
from sql_generators.glean_usage.common import (
GleanTable,
get_table_dir,
render,
write_sql,
)
AGGREGATE_TABLE_NAME = "event_flow_monitoring_aggregates_v1"
TARGET_DATASET_CROSS_APP = "monitoring"
PREFIX = "event_flow_monitoring"
PATH = Path(os.path.dirname(__file__))
class EventFlowMonitoring(GleanTable):
"""Represents the generated aggregated table for event flow monitoring."""
def __init__(self) -> None:
self.no_init = False
self.per_app_id_enabled = False
self.per_app_enabled = False
self.across_apps_enabled = True
self.prefix = PREFIX
self.target_table_id = AGGREGATE_TABLE_NAME
self.custom_render_kwargs = {}
self.base_table_name = "events_unnested"
def generate_across_apps(
self, project_id, apps, output_dir=None, use_cloud_function=True
):
"""Generate a query across all apps."""
if not self.across_apps_enabled:
return
apps = [app[0]["app_name"] for app in apps]
render_kwargs = dict(
project_id=project_id,
target_table=f"{TARGET_DATASET_CROSS_APP}_derived.{AGGREGATE_TABLE_NAME}",
apps=apps,
)
render_kwargs.update(self.custom_render_kwargs)
skip_existing_artifacts = self.skip_existing(output_dir, project_id)
Artifact = namedtuple("Artifact", "table_id basename sql")
query_filename = f"{AGGREGATE_TABLE_NAME}.script.sql"
script_sql = render(
query_filename, template_folder=PATH / "templates", **render_kwargs
)
metadata = render(
f"{AGGREGATE_TABLE_NAME}.metadata.yaml",
template_folder=PATH / "templates",
format=False,
**render_kwargs,
)
schema = render(
f"{AGGREGATE_TABLE_NAME}.schema.yaml",
template_folder=PATH / "templates",
format=False,
**render_kwargs,
)
table = (
f"{project_id}.{TARGET_DATASET_CROSS_APP}_derived.{AGGREGATE_TABLE_NAME}"
)
if output_dir:
artifacts = [
Artifact(table, "metadata.yaml", metadata),
Artifact(table, "script.sql", script_sql),
Artifact(table, "schema.yaml", schema)
]
for artifact in artifacts:
destination = (
get_table_dir(output_dir, artifact.table_id) / artifact.basename
)
skip_existing = destination in skip_existing_artifacts
write_sql(
output_dir,
artifact.table_id,
artifact.basename,
artifact.sql,
skip_existing=skip_existing,
)

Просмотреть файл

@ -36,7 +36,12 @@ class EventMonitoringLive(GleanTable):
self.base_table_name = "events_v1"
def generate_per_app_id(
self, project_id, baseline_table, output_dir=None, use_cloud_function=True, app_info=[]
self,
project_id,
baseline_table,
output_dir=None,
use_cloud_function=True,
app_info=[],
):
tables = table_names_from_baseline(baseline_table, include_project_id=False)

Просмотреть файл

@ -0,0 +1,23 @@
friendly_name: Event Flow Monitoring Aggregates
description: |-
Aggregates of event flows (funnels based on flow_id) in event pings coming from all Glean apps.
owners:
- akomar@mozilla.com
- ascholtz@mozilla.com
labels:
incremental: true
scheduling:
dag_name: bqetl_monitoring
referenced_tables:
- ['moz-fx-data-shared-prod', '*_stable', 'events_v1']
date_partition_parameter: null
parameters: ["submission_date:DATE:{{ds}}"]
bigquery:
time_partitioning:
type: day
field: submission_date
require_partitions_filter: false
clustering:
fields:
- normalized_app_name
- channel

Просмотреть файл

@ -0,0 +1,54 @@
fields:
- mode: NULLABLE
name: submission_date
type: DATE
description: The date when flow was captured
- mode: NULLABLE
name: flow_id
type: STRING
description: Unique identifier for the specific flow
- fields:
- fields:
- name: category
type: STRING
description: Event category
- name: name
type: STRING
description: Event name
- name: timestamp
type: TIMESTAMP
description: Event timestamp
mode: NULLABLE
name: source
type: RECORD
description: Source event
- fields:
- name: category
type: STRING
description: Event category
- name: name
type: STRING
description: Event name
- name: timestamp
type: TIMESTAMP
description: Event timestamp
mode: NULLABLE
name: target
type: RECORD
description: Target event
mode: REPEATED
name: events
type: RECORD
description: Flow events
- mode: NULLABLE
name: normalized_app_name
type: STRING
description: The name of the app the event flow is coming from
- mode: NULLABLE
name: channel
type: STRING
description: The app channel
- mode: NULLABLE
name: flow_hash
type: STRING
description: Hash of the complete event flow

Просмотреть файл

@ -0,0 +1,187 @@
-- Generated via ./bqetl generate glean_usage
-- This table aggregates event flows across Glean applications.
DECLARE dummy INT64; -- dummy variable to indicate to BigQuery this is a script
CREATE TEMP TABLE
event_flows(
submission_date DATE,
flow_id STRING,
normalized_app_name STRING,
channel STRING,
events ARRAY<
STRUCT<
source STRUCT<category STRING, name STRING, timestamp TIMESTAMP>,
target STRUCT<category STRING, name STRING, timestamp TIMESTAMP>
>
>,
flow_hash STRING
) AS (
-- get events from all apps that are related to some flow (have 'flow_id' in event_extras)
WITH all_app_events AS (
{% for app in apps -%}
{% if not loop.first -%}
UNION ALL
{% endif %}
{% if dataset_id not in ["telemetry", "accounts_frontend", "accounts_backend"] %}
SELECT DISTINCT
@submission_date AS submission_date,
ext.value AS flow_id,
event_category AS category,
event_name AS name,
TIMESTAMP_ADD(
submission_timestamp,
-- limit event.timestamp, otherwise this will cause an overflow
INTERVAL LEAST(event_timestamp, 20000000000000) MILLISECOND
) AS timestamp,
"{{ app }}" AS normalized_app_name,
client_info.app_channel AS channel
FROM
`moz-fx-data-shared-prod.{{ app }}.events_unnested`,
UNNEST(event_extra) AS ext
WHERE
DATE(submission_timestamp) = @submission_date
AND ext.key = "flow_id"
{% elif dataset_id in ["accounts_frontend", "accounts_backend"] %}
SELECT DISTINCT
@submission_date AS submission_date,
metrics.string.session_flow_id AS flow_id,
NULL AS category,
metrics.string.event_name AS name,
submission_timestamp AS timestamp,
"{{ app }}" AS normalized_app_name,
client_info.app_channel AS channel
FROM
`moz-fx-data-shared-prod.{{ app }}.accounts_events`
WHERE
DATE(submission_timestamp) = @submission_date
AND metrics.string.session_flow_id != ""
{% endif %}
{% endfor %}
),
-- determine events that belong to the same flow
new_event_flows AS (
SELECT
@submission_date AS submission_date,
flow_id,
normalized_app_name,
channel,
ARRAY_AGG((
SELECT AS
STRUCT category AS category, name AS name, timestamp AS timestamp
LIMIT 100 -- limit number of events considered
) ORDER BY timestamp) AS events
FROM
all_app_events
GROUP BY
flow_id,
normalized_app_name,
channel
),
unnested_events AS (
SELECT
new_event_flows.*,
event,
event_offset
FROM
new_event_flows,
UNNEST(events) AS event
WITH OFFSET AS event_offset
),
-- create source -> target event pairs based on the order of when the events were seen
source_target_events AS (
SELECT
prev_event.flow_id,
prev_event.normalized_app_name,
prev_event.channel,
ARRAY_AGG(
STRUCT(prev_event.event AS source, cur_event.event AS target)
ORDER BY
prev_event.event.timestamp
) AS events
FROM
unnested_events AS prev_event
INNER JOIN
unnested_events AS cur_event
ON
prev_event.flow_id = cur_event.flow_id
AND prev_event.event_offset = cur_event.event_offset - 1
GROUP BY
flow_id,
normalized_app_name,
channel
)
SELECT
@submission_date AS submission_date,
flow_id,
normalized_app_name,
channel,
ARRAY_AGG(event ORDER BY event.source.timestamp) AS events,
-- create a flow hash that concats all the events that are part of the flow
-- <event_category>.<event_name> -> <event_category>.<event_name> -> ...
ARRAY_TO_STRING(
ARRAY_CONCAT(
ARRAY_AGG(
CONCAT(IF(event.source.category IS NOT NULL, CONCAT(event.source.category, "."), ""), event.source.name)
ORDER BY
event.source.timestamp
),
[
ARRAY_REVERSE(
ARRAY_AGG(
CONCAT(IF(event.target.category IS NOT NULL, CONCAT(event.target.category, "."), ""), event.target.name)
ORDER BY
event.source.timestamp
)
)[SAFE_OFFSET(0)]
]
),
" -> "
) AS flow_hash
FROM
(
SELECT
flow_id,
normalized_app_name,
channel,
event
FROM
source_target_events,
UNNEST(events) AS event
UNION ALL
-- some flows might go over multiple days;
-- use previously seen flows and combine with new flows
SELECT
flow_id,
normalized_app_name,
channel,
event
FROM
`{{ project_id }}.{{ target_table }}`,
UNNEST(events) AS event
WHERE
submission_date > DATE_SUB(@submission_date, INTERVAL 3 DAY)
)
GROUP BY
flow_id,
normalized_app_name,
channel
);
MERGE
`{{ project_id }}.{{ target_table }}` r
USING
event_flows f
ON
r.flow_id = f.flow_id
-- look back up to 3 days to see if a flow has seen new events and needs to be replaced
AND r.submission_date > DATE_SUB(@submission_date, INTERVAL 3 DAY)
WHEN NOT MATCHED
THEN
INSERT
(submission_date, flow_id, events, normalized_app_name, channel, flow_hash)
VALUES
(f.submission_date, f.flow_id, f.events, f.normalized_app_name, f.channel, f.flow_hash)
WHEN NOT MATCHED BY SOURCE
-- look back up to 3 days to see if a flow has seen new events and needs to be replaced
AND r.submission_date > DATE_SUB(@submission_date, INTERVAL 3 DAY)
THEN
DELETE;

Просмотреть файл

@ -98,4 +98,4 @@ def generate(target_project, output_dir, use_cloud_function):
final_schema_path = final_path / "schema.yaml"
if os.path.exists(source_schema_path):
shutil.copyfile(source_schema_path, final_schema_path)
shutil.copyfile(source_schema_path, final_schema_path)