Materialized views and aggregated tables for event monitoring (#4478)
* WIP event monitoring * Add FxA custom events to view definition (#4483) * Add FxA custom events to view definition * Update sql_generators/event_monitoring/templates/event_monitoring_live.init.sql * Update sql_generators/event_monitoring/templates/event_monitoring_live.init.sql * Update sql_generators/event_monitoring/templates/event_monitoring_live.init.sql * Update sql_generators/event_monitoring/templates/event_monitoring_live.init.sql --------- Co-authored-by: Anna Scholtz <anna@scholtzan.net> * Move event monitoring to glean_usage generator * Add cross-app event monitoring view * Generate cross app monitoring * Simplyfy event monitoring aggregation --------- Co-authored-by: akkomar <akkomar@users.noreply.github.com>
This commit is contained in:
Родитель
1e42477c38
Коммит
185f833f2a
|
@ -1292,7 +1292,13 @@ def initialize(name, sql_dir, project_id, dry_run):
|
|||
# allow name to be a path
|
||||
query_files = [Path(name)]
|
||||
else:
|
||||
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
|
||||
file_regex = re.compile(
|
||||
r"^.*/([a-zA-Z0-9-]+)/([a-zA-Z0-9_]+)/([a-zA-Z0-9_]+(_v[0-9]+)?)/"
|
||||
r"(?:query\.sql|init\.sql)$"
|
||||
)
|
||||
query_files = paths_matching_name_pattern(
|
||||
name, sql_dir, project_id, file_regex=file_regex
|
||||
)
|
||||
|
||||
if not query_files:
|
||||
click.echo(
|
||||
|
|
|
@ -28,7 +28,6 @@ from ..view import View
|
|||
|
||||
VIEW_FILE = "view.sql"
|
||||
QUERY_FILE = "query.sql"
|
||||
INIT_FILE = "init.sql"
|
||||
QUERY_SCRIPT = "query.py"
|
||||
ROOT = Path(__file__).parent.parent.parent
|
||||
TEST_DIR = ROOT / "tests" / "sql"
|
||||
|
@ -356,7 +355,7 @@ def _deploy_artifacts(ctx, artifact_files, project_id, dataset_suffix, sql_dir):
|
|||
query_files = [
|
||||
file
|
||||
for file in artifact_files
|
||||
if file.name in [INIT_FILE, QUERY_FILE, QUERY_SCRIPT]
|
||||
if file.name in [QUERY_FILE, QUERY_SCRIPT]
|
||||
# don't attempt to deploy wildcard or metadata tables
|
||||
and "*" not in file.parent.name and file.parent.name != "INFORMATION_SCHEMA"
|
||||
]
|
||||
|
|
|
@ -188,6 +188,8 @@ dry_run:
|
|||
- sql/moz-fx-data-shared-prod/org_mozilla_firefox_beta_derived/experiment_events_live_v1/init.sql
|
||||
- sql/moz-fx-data-shared-prod/telemetry_derived/experiment_enrollment_cumulative_population_estimate_v1/view.sql
|
||||
- sql/moz-fx-data-shared-prod/telemetry/experiment_enrollment_cumulative_population_estimate/view.sql
|
||||
- sql/moz-fx-data-shared-prod/**/event_monitoring_live_v1/init.sql
|
||||
- sql/moz-fx-data-shared-prod/monitoring/event_monitoring_live/view.sql
|
||||
# Already exists (and lacks an "OR REPLACE" clause)
|
||||
- sql/moz-fx-data-shared-prod/org_mozilla_firefox_derived/clients_first_seen_v1/init.sql
|
||||
- sql/moz-fx-data-shared-prod/org_mozilla_firefox_derived/clients_last_seen_v1/init.sql
|
||||
|
@ -426,3 +428,8 @@ generate:
|
|||
- sql/moz-fx-data-shared-prod/mozilla_vpn/events/**
|
||||
- sql/moz-fx-data-shared-prod/mozilla_vpn/main/**
|
||||
- sql/moz-fx-data-shared-prod/fenix/client_deduplication/**
|
||||
event_monitoring:
|
||||
skip_apps:
|
||||
- mlhackweek_search
|
||||
- regrets_reporter
|
||||
- regrets_reporter_ucs
|
||||
|
|
|
@ -165,6 +165,17 @@ with DAG(
|
|||
email=["ascholtz@mozilla.com"],
|
||||
)
|
||||
|
||||
monitoring_derived__event_monitoring_aggregates__v1 = bigquery_etl_query(
|
||||
task_id="monitoring_derived__event_monitoring_aggregates__v1",
|
||||
destination_table="event_monitoring_aggregates_v1",
|
||||
dataset_id="monitoring_derived",
|
||||
project_id="moz-fx-data-shared-prod",
|
||||
owner="ascholtz@mozilla.com",
|
||||
email=["akomar@mozilla.com", "ascholtz@mozilla.com"],
|
||||
date_partition_parameter="submission_date",
|
||||
depends_on_past=False,
|
||||
)
|
||||
|
||||
monitoring_derived__jobs_by_organization__v1 = gke_command(
|
||||
task_id="monitoring_derived__jobs_by_organization__v1",
|
||||
command=[
|
||||
|
@ -323,6 +334,10 @@ with DAG(
|
|||
wait_for_copy_deduplicate_main_ping
|
||||
)
|
||||
|
||||
monitoring_derived__event_monitoring_aggregates__v1.set_upstream(
|
||||
wait_for_copy_deduplicate_all
|
||||
)
|
||||
|
||||
monitoring_derived__stable_and_derived_table_sizes__v1.set_upstream(
|
||||
wait_for_copy_deduplicate_all
|
||||
)
|
||||
|
|
|
@ -15,6 +15,7 @@ from sql_generators.glean_usage import (
|
|||
baseline_clients_first_seen,
|
||||
baseline_clients_last_seen,
|
||||
clients_last_seen_joined,
|
||||
event_monitoring_live,
|
||||
events_unnested,
|
||||
glean_app_ping_views,
|
||||
metrics_clients_daily,
|
||||
|
@ -32,6 +33,7 @@ GLEAN_TABLES = [
|
|||
metrics_clients_daily.MetricsClientsDaily(),
|
||||
metrics_clients_last_seen.MetricsClientsLastSeen(),
|
||||
clients_last_seen_joined.ClientsLastSeenJoined(),
|
||||
event_monitoring_live.EventMonitoringLive(),
|
||||
]
|
||||
|
||||
# * mlhackweek_search was an experiment that we don't want to generate tables
|
||||
|
@ -152,5 +154,22 @@ def generate(
|
|||
for table in GLEAN_TABLES
|
||||
]
|
||||
|
||||
# Parameters to generate datasets that union all app datasets
|
||||
generate_across_apps = [
|
||||
(
|
||||
partial(
|
||||
table.generate_across_apps,
|
||||
target_project,
|
||||
output_dir=output_dir,
|
||||
use_cloud_function=use_cloud_function,
|
||||
),
|
||||
app_info,
|
||||
)
|
||||
for table in GLEAN_TABLES
|
||||
]
|
||||
|
||||
with ProcessingPool(parallelism) as pool:
|
||||
pool.map(lambda f: f[0](f[1]), generate_per_app_id + generate_per_app)
|
||||
pool.map(
|
||||
lambda f: f[0](f[1]),
|
||||
generate_per_app_id + generate_per_app + generate_across_apps,
|
||||
)
|
||||
|
|
|
@ -101,6 +101,7 @@ def table_names_from_baseline(baseline_table, include_project_id=True):
|
|||
daily_view=f"{prefix}.baseline_clients_daily",
|
||||
last_seen_view=f"{prefix}.baseline_clients_last_seen",
|
||||
first_seen_view=f"{prefix}.baseline_clients_first_seen",
|
||||
event_monitoring=f"{prefix}_derived.event_monitoring_live_v1",
|
||||
)
|
||||
|
||||
|
||||
|
@ -160,6 +161,7 @@ class GleanTable:
|
|||
self.no_init = True
|
||||
self.per_app_id_enabled = True
|
||||
self.per_app_enabled = True
|
||||
self.across_apps_enabled = True
|
||||
self.cross_channel_template = "cross_channel.view.sql"
|
||||
|
||||
def skip_existing(self, output_dir="sql/", project_id="moz-fx-data-shared-prod"):
|
||||
|
@ -255,8 +257,7 @@ class GleanTable:
|
|||
if not (referenced_table_exists(view_sql)):
|
||||
logging.info("Skipping view for table which doesn't exist:" f" {table}")
|
||||
else:
|
||||
artifacts.append(
|
||||
Artifact(view, "view.sql", view_sql))
|
||||
artifacts.append(Artifact(view, "view.sql", view_sql))
|
||||
|
||||
skip_existing_artifact = self.skip_existing(output_dir, project_id)
|
||||
|
||||
|
@ -394,3 +395,11 @@ class GleanTable:
|
|||
|
||||
write_dataset_metadata(output_dir, view)
|
||||
write_dataset_metadata(output_dir, table, derived_dataset_metadata=True)
|
||||
|
||||
def generate_across_apps(
|
||||
self, project_id, apps, output_dir=None, use_cloud_function=True
|
||||
):
|
||||
"""Generate a query across all apps."""
|
||||
# logic for implementing cross-app queries needs to be implemented in the
|
||||
# individual classes
|
||||
return
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
"""Generate Materialized Views and aggregate queries for event monitoring."""
|
||||
|
||||
import os
|
||||
from collections import namedtuple
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from bigquery_etl.schema.stable_table_schema import get_stable_table_schemas
|
||||
from sql_generators.glean_usage.common import (
|
||||
GleanTable,
|
||||
get_app_info,
|
||||
get_table_dir,
|
||||
render,
|
||||
table_names_from_baseline,
|
||||
write_sql,
|
||||
)
|
||||
|
||||
TARGET_TABLE_ID = "event_monitoring_live_v1"
|
||||
TARGET_DATASET_CROSS_APP = "monitoring"
|
||||
PREFIX = "event_monitoring"
|
||||
PATH = Path(os.path.dirname(__file__))
|
||||
|
||||
|
||||
class EventMonitoringLive(GleanTable):
|
||||
"""Represents the generated materialized view for event monitoring."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize materialized view generation."""
|
||||
self.no_init = False
|
||||
self.per_app_id_enabled = True
|
||||
self.per_app_enabled = False
|
||||
self.across_apps_enabled = True
|
||||
self.prefix = PREFIX
|
||||
self.target_table_id = TARGET_TABLE_ID
|
||||
self.custom_render_kwargs = {}
|
||||
|
||||
def generate_per_app_id(
|
||||
self, project_id, baseline_table, output_dir=None, use_cloud_function=True
|
||||
):
|
||||
tables = table_names_from_baseline(baseline_table, include_project_id=False)
|
||||
|
||||
init_filename = f"{self.target_table_id}.init.sql"
|
||||
metadata_filename = f"{self.target_table_id}.metadata.yaml"
|
||||
|
||||
table = tables[f"{self.prefix}"]
|
||||
dataset = tables[self.prefix].split(".")[-2].replace("_derived", "")
|
||||
|
||||
render_kwargs = dict(
|
||||
header="-- Generated via bigquery_etl.glean_usage\n",
|
||||
header_yaml="---\n# Generated via bigquery_etl.glean_usage\n",
|
||||
project_id=project_id,
|
||||
derived_dataset=tables[self.prefix].split(".")[-2],
|
||||
dataset=dataset,
|
||||
current_date=datetime.today().strftime("%Y-%m-%d"),
|
||||
app_name=[
|
||||
app_dataset["canonical_app_name"]
|
||||
for _, app in get_app_info().items()
|
||||
for app_dataset in app
|
||||
if dataset == app_dataset["bq_dataset_family"]
|
||||
][0],
|
||||
)
|
||||
|
||||
render_kwargs.update(self.custom_render_kwargs)
|
||||
render_kwargs.update(tables)
|
||||
|
||||
# generated files to update
|
||||
Artifact = namedtuple("Artifact", "table_id basename sql")
|
||||
artifacts = []
|
||||
|
||||
if not self.no_init:
|
||||
init_sql = render(
|
||||
init_filename, template_folder=PATH / "templates", **render_kwargs
|
||||
)
|
||||
metadata = render(
|
||||
metadata_filename,
|
||||
template_folder=PATH / "templates",
|
||||
format=False,
|
||||
**render_kwargs,
|
||||
)
|
||||
artifacts.append(Artifact(table, "metadata.yaml", metadata))
|
||||
|
||||
skip_existing_artifact = self.skip_existing(output_dir, project_id)
|
||||
|
||||
if output_dir:
|
||||
if not self.no_init:
|
||||
artifacts.append(Artifact(table, "init.sql", init_sql))
|
||||
|
||||
for artifact in artifacts:
|
||||
destination = (
|
||||
get_table_dir(output_dir, artifact.table_id) / artifact.basename
|
||||
)
|
||||
skip_existing = str(destination) in skip_existing_artifact
|
||||
|
||||
write_sql(
|
||||
output_dir,
|
||||
artifact.table_id,
|
||||
artifact.basename,
|
||||
artifact.sql,
|
||||
skip_existing=skip_existing,
|
||||
)
|
||||
|
||||
def generate_across_apps(
|
||||
self, project_id, apps, output_dir=None, use_cloud_function=True
|
||||
):
|
||||
"""Generate a query across all apps."""
|
||||
if not self.across_apps_enabled:
|
||||
return
|
||||
|
||||
prod_datasets_with_baseline = [
|
||||
s.bq_dataset_family
|
||||
for s in get_stable_table_schemas()
|
||||
if s.schema_id == "moz://mozilla.org/schemas/glean/ping/1"
|
||||
and s.bq_table == "baseline_v1"
|
||||
]
|
||||
|
||||
aggregate_table = "event_monitoring_aggregates_v1"
|
||||
target_view_name = "_".join(self.target_table_id.split("_")[:-1])
|
||||
|
||||
render_kwargs = dict(
|
||||
header="-- Generated via bigquery_etl.glean_usage\n",
|
||||
header_yaml="---\n# Generated via bigquery_etl.glean_usage\n",
|
||||
project_id=project_id,
|
||||
target_view=f"{TARGET_DATASET_CROSS_APP}.{target_view_name}",
|
||||
table=target_view_name,
|
||||
target_table=f"{TARGET_DATASET_CROSS_APP}_derived.{aggregate_table}",
|
||||
apps=apps,
|
||||
prod_datasets=prod_datasets_with_baseline,
|
||||
)
|
||||
render_kwargs.update(self.custom_render_kwargs)
|
||||
|
||||
skip_existing_artifacts = self.skip_existing(output_dir, project_id)
|
||||
|
||||
Artifact = namedtuple("Artifact", "table_id basename sql")
|
||||
|
||||
query_filename = f"{aggregate_table}.query.sql"
|
||||
query_sql = render(
|
||||
query_filename, template_folder=PATH / "templates", **render_kwargs
|
||||
)
|
||||
metadata = render(
|
||||
f"{aggregate_table}.metadata.yaml",
|
||||
template_folder=PATH / "templates",
|
||||
format=False,
|
||||
**render_kwargs,
|
||||
)
|
||||
table = f"{project_id}.{TARGET_DATASET_CROSS_APP}_derived.{aggregate_table}"
|
||||
|
||||
view_sql = render(
|
||||
"event_monitoring_live.view.sql",
|
||||
template_folder=PATH / "templates",
|
||||
**render_kwargs,
|
||||
)
|
||||
view_metadata = render(
|
||||
"event_monitoring_live.metadata.yaml",
|
||||
template_folder=PATH / "templates",
|
||||
format=False,
|
||||
**render_kwargs,
|
||||
)
|
||||
|
||||
view = f"{project_id}.{TARGET_DATASET_CROSS_APP}.{target_view_name}"
|
||||
if output_dir:
|
||||
artifacts = [
|
||||
Artifact(table, "metadata.yaml", metadata),
|
||||
Artifact(table, "query.sql", query_sql),
|
||||
Artifact(view, "metadata.yaml", view_metadata),
|
||||
Artifact(view, "view.sql", view_sql),
|
||||
]
|
||||
|
||||
for artifact in artifacts:
|
||||
destination = (
|
||||
get_table_dir(output_dir, artifact.table_id) / artifact.basename
|
||||
)
|
||||
skip_existing = destination in skip_existing_artifacts
|
||||
|
||||
write_sql(
|
||||
output_dir,
|
||||
artifact.table_id,
|
||||
artifact.basename,
|
||||
artifact.sql,
|
||||
skip_existing=skip_existing,
|
||||
)
|
|
@ -0,0 +1,26 @@
|
|||
-- Generated via ./bqetl generate glean_usage
|
||||
CREATE OR REPLACE VIEW
|
||||
`{{ project_id }}.{{ target_view }}`
|
||||
AS
|
||||
{% for (dataset, channel) in datasets -%}
|
||||
{% if not loop.first -%}
|
||||
UNION ALL
|
||||
{% endif -%}
|
||||
SELECT
|
||||
{% if app_name == "fenix" -%}
|
||||
mozfun.norm.fenix_app_info("{{ dataset }}", client_info.app_build).channel AS normalized_channel,
|
||||
{% elif datasets|length > 1 -%}
|
||||
"{{ channel }}" AS normalized_channel,
|
||||
{% endif -%}
|
||||
normalized_app_name,
|
||||
window_start,
|
||||
window_end,
|
||||
event_category,
|
||||
event_name,
|
||||
event_extra_key,
|
||||
country,
|
||||
version,
|
||||
total_events
|
||||
FROM
|
||||
`{{ project_id }}.{{ dataset }}_derived.event_monitoring_live_v1`
|
||||
{% endfor %}
|
|
@ -0,0 +1,21 @@
|
|||
friendly_name: Event Monitoring Aggregates
|
||||
description: |-
|
||||
Materialized view of experimentation related events
|
||||
coming from all Glean apps.
|
||||
owners:
|
||||
- ascholtz@mozilla.com
|
||||
- akomar@mozilla.com
|
||||
labels:
|
||||
incremental: true
|
||||
scheduling:
|
||||
dag_name: bqetl_monitoring
|
||||
bigquery:
|
||||
time_partitioning:
|
||||
type: day
|
||||
field: submission_date
|
||||
require_partitions_filter: false
|
||||
clustering:
|
||||
fields:
|
||||
- event_name
|
||||
- normalized_channel
|
||||
- normalized_app_name
|
|
@ -0,0 +1,112 @@
|
|||
-- Generated via ./bqetl generate glean_usage
|
||||
{% for app in apps %}
|
||||
{% set outer_loop = loop -%}
|
||||
{% for dataset in app -%}
|
||||
{% if dataset['bq_dataset_family'] not in ["telemetry", "accounts_frontend", "accounts_backend"] %}
|
||||
{% if not outer_loop.first -%}
|
||||
UNION ALL
|
||||
{% endif -%}
|
||||
SELECT
|
||||
@submission_date AS submission_date,
|
||||
TIMESTAMP_ADD(
|
||||
TIMESTAMP_TRUNC(SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time), HOUR),
|
||||
-- Aggregates event counts over 30-minute intervals
|
||||
INTERVAL(
|
||||
DIV(
|
||||
EXTRACT(MINUTE FROM SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time)),
|
||||
60
|
||||
) * 60
|
||||
) MINUTE
|
||||
) AS window_start,
|
||||
TIMESTAMP_ADD(
|
||||
TIMESTAMP_TRUNC(SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time), HOUR),
|
||||
INTERVAL(
|
||||
(
|
||||
DIV(
|
||||
EXTRACT(MINUTE FROM SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time)),
|
||||
60
|
||||
) + 1
|
||||
) * 60
|
||||
) MINUTE
|
||||
) AS window_end,
|
||||
event.category AS event_category,
|
||||
event.name AS event_name,
|
||||
event_extra.key AS event_extra_key,
|
||||
normalized_country_code AS country,
|
||||
"{{ dataset['canonical_app_name'] }}" AS normalized_app_name,
|
||||
{% if app_name == "fenix" -%}
|
||||
mozfun.norm.fenix_app_info("{{ dataset['bq_dataset_family'] }}", app_build).channel AS normalized_channel,
|
||||
{% else %}
|
||||
"{{ dataset.get('app_channel', 'release') }}" AS normalized_channel,
|
||||
{% endif %}
|
||||
client_info.app_display_version AS version,
|
||||
COUNT(*) AS total_events
|
||||
FROM
|
||||
`{{ project_id }}.{{ dataset['bq_dataset_family'] }}_stable.events_v1`
|
||||
CROSS JOIN
|
||||
UNNEST(events) AS event,
|
||||
UNNEST(event.extra) AS event_extra
|
||||
WHERE DATE(submission_timestamp) = @submission_date
|
||||
GROUP BY
|
||||
submission_date,
|
||||
window_start,
|
||||
window_end,
|
||||
event_category,
|
||||
event_name,
|
||||
event_extra_key,
|
||||
country,
|
||||
normalized_app_name,
|
||||
normalized_channel,
|
||||
version
|
||||
{% elif dataset in ["accounts_frontend", "accounts_backend"] %}
|
||||
{% if not outer_loop.first -%}
|
||||
UNION ALL
|
||||
{% endif -%}
|
||||
-- FxA uses custom pings to send events without a category and extras.
|
||||
SELECT
|
||||
@submission_date AS submission_date,
|
||||
TIMESTAMP_ADD(
|
||||
TIMESTAMP_TRUNC(SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time), HOUR),
|
||||
-- Aggregates event counts over 30-minute intervals
|
||||
INTERVAL(
|
||||
DIV(
|
||||
EXTRACT(MINUTE FROM SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time)),
|
||||
60
|
||||
) * 60
|
||||
) MINUTE
|
||||
) AS window_start,
|
||||
TIMESTAMP_ADD(
|
||||
TIMESTAMP_TRUNC(SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time), HOUR),
|
||||
INTERVAL(
|
||||
(
|
||||
DIV(
|
||||
EXTRACT(MINUTE FROM SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time)),
|
||||
60
|
||||
) + 1
|
||||
) * 60
|
||||
) MINUTE
|
||||
) AS window_end,
|
||||
NULL AS event_category,
|
||||
metrics.string.event_name,
|
||||
NULL AS event_extra_key,
|
||||
normalized_country_code AS country,
|
||||
"{{ dataset['canonical_app_name'] }}" AS normalized_app_name,
|
||||
normalized_channel,
|
||||
client_info.app_display_version AS VERSION,
|
||||
COUNT(*) AS total_events
|
||||
FROM
|
||||
`{{ project_id }}.{{ dataset['bq_dataset_family'] }}_stable.accounts_events_v1`
|
||||
WHERE DATE(submission_timestamp) = @submission_date
|
||||
GROUP BY
|
||||
window_start,
|
||||
window_end,
|
||||
event_category,
|
||||
event_name,
|
||||
event_extra_key,
|
||||
country,
|
||||
normalized_app_name,
|
||||
normalized_channel,
|
||||
version
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% endfor %}
|
|
@ -0,0 +1,6 @@
|
|||
friendly_name: Event Monitoring Live
|
||||
description: |-
|
||||
View that combines live and aggregated event monitoring data for {{ dataset_id }}
|
||||
owners:
|
||||
- ascholtz@mozilla.com
|
||||
- akomar@mozilla.com
|
|
@ -0,0 +1,39 @@
|
|||
CREATE OR REPLACE VIEW `{{ project_id }}.{{ target_view }}` AS
|
||||
{% for app in apps %}
|
||||
{% set outer_loop = loop -%}
|
||||
{% for dataset in app -%}
|
||||
{% if dataset['bq_dataset_family'] in prod_datasets %}
|
||||
SELECT
|
||||
window_start,
|
||||
window_end,
|
||||
event_category,
|
||||
event_name,
|
||||
event_extra_key,
|
||||
country,
|
||||
normalized_app_name,
|
||||
normalized_channel,
|
||||
version,
|
||||
total_events
|
||||
FROM
|
||||
`{{ project_id }}.{{ dataset['bq_dataset_family'] }}_derived.event_monitoring_live_v1`
|
||||
WHERE
|
||||
submission_date > DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)
|
||||
UNION ALL
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% endfor %}
|
||||
SELECT
|
||||
window_start,
|
||||
window_end,
|
||||
event_category,
|
||||
event_name,
|
||||
event_extra_key,
|
||||
country,
|
||||
normalized_app_name,
|
||||
normalized_channel,
|
||||
version,
|
||||
total_events
|
||||
FROM
|
||||
`{{ project_id }}.{{ target_table }}`
|
||||
WHERE
|
||||
submission_date <= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)
|
|
@ -0,0 +1,117 @@
|
|||
CREATE MATERIALIZED VIEW
|
||||
IF
|
||||
NOT EXISTS `{{ project_id }}.{{ derived_dataset }}.event_monitoring_live_v1`
|
||||
OPTIONS
|
||||
(enable_refresh = TRUE, refresh_interval_minutes = 60) AS
|
||||
{% if dataset_id not in ["telemetry", "accounts_frontend", "accounts_backend"] %}
|
||||
SELECT
|
||||
DATE(submission_timestamp) AS submission_date,
|
||||
TIMESTAMP_ADD(
|
||||
TIMESTAMP_TRUNC(
|
||||
TIMESTAMP_ADD(
|
||||
SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time),
|
||||
INTERVAL event.timestamp MILLISECOND
|
||||
),
|
||||
HOUR
|
||||
),
|
||||
-- Aggregates event counts over 30-minute intervals
|
||||
INTERVAL(
|
||||
DIV(
|
||||
EXTRACT(
|
||||
MINUTE
|
||||
FROM
|
||||
TIMESTAMP_ADD(
|
||||
SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time),
|
||||
INTERVAL event.timestamp MILLISECOND
|
||||
)
|
||||
),
|
||||
60
|
||||
) * 60
|
||||
) MINUTE
|
||||
) AS window_start,
|
||||
TIMESTAMP_ADD(
|
||||
TIMESTAMP_TRUNC(
|
||||
TIMESTAMP_ADD(
|
||||
SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time),
|
||||
INTERVAL event.timestamp MILLISECOND
|
||||
),
|
||||
HOUR
|
||||
),
|
||||
INTERVAL(
|
||||
(
|
||||
DIV(
|
||||
EXTRACT(
|
||||
MINUTE
|
||||
FROM
|
||||
TIMESTAMP_ADD(
|
||||
SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time),
|
||||
INTERVAL event.timestamp MILLISECOND
|
||||
)
|
||||
),
|
||||
60
|
||||
) + 1
|
||||
) * 60
|
||||
) MINUTE
|
||||
) AS window_end,
|
||||
event.category AS event_category,
|
||||
event.name AS event_name,
|
||||
event_extra.key AS event_extra_key,
|
||||
normalized_country_code AS country,
|
||||
'{{ app_name }}' AS normalized_app_name,
|
||||
normalized_channel,
|
||||
client_info.app_display_version AS version,
|
||||
COUNT(*) AS total_events
|
||||
FROM
|
||||
`{{ project_id }}.{{ dataset }}_live.events_v1`
|
||||
CROSS JOIN
|
||||
UNNEST(events) AS event,
|
||||
UNNEST(event.extra) AS event_extra
|
||||
{% elif dataset_id in ["accounts_frontend", "accounts_backend"] %}
|
||||
-- FxA uses custom pings to send events without a category and extras.
|
||||
SELECT
|
||||
DATE(submission_timestamp) AS submission_date,
|
||||
TIMESTAMP_ADD(
|
||||
TIMESTAMP_TRUNC(SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time), HOUR),
|
||||
-- Aggregates event counts over 30-minute intervals
|
||||
INTERVAL(
|
||||
DIV(
|
||||
EXTRACT(MINUTE FROM SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time)),
|
||||
60
|
||||
) * 60
|
||||
) MINUTE
|
||||
) AS window_start,
|
||||
TIMESTAMP_ADD(
|
||||
TIMESTAMP_TRUNC(SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time), HOUR),
|
||||
INTERVAL(
|
||||
(
|
||||
DIV(
|
||||
EXTRACT(MINUTE FROM SAFE.PARSE_TIMESTAMP('%FT%H:%M%Ez', ping_info.start_time)),
|
||||
60
|
||||
) + 1
|
||||
) * 60
|
||||
) MINUTE
|
||||
) AS window_end,
|
||||
NULL AS event_category,
|
||||
metrics.string.event_name,
|
||||
NULL AS event_extra_key,
|
||||
normalized_country_code AS country,
|
||||
'{{ app_name }}' AS normalized_app_name,
|
||||
normalized_channel,
|
||||
client_info.app_display_version AS VERSION,
|
||||
COUNT(*) AS total_events
|
||||
FROM
|
||||
`{{ project_id }}.{{ dataset }}_live.accounts_events_v1`
|
||||
{% endif %}
|
||||
WHERE
|
||||
DATE(submission_timestamp) >= "{{ current_date }}"
|
||||
GROUP BY
|
||||
submission_date,
|
||||
window_start,
|
||||
window_end,
|
||||
event_category,
|
||||
event_name,
|
||||
event_extra_key,
|
||||
country,
|
||||
normalized_app_name,
|
||||
normalized_channel,
|
||||
version
|
|
@ -0,0 +1,9 @@
|
|||
friendly_name: Event Monitoring Live
|
||||
description: |-
|
||||
Materialized view of experimentation related events
|
||||
coming from {{ dataset_id }}.
|
||||
owners:
|
||||
- ascholtz@mozilla.com
|
||||
- akomar@mozilla.com
|
||||
labels:
|
||||
materialized_view: true
|
|
@ -2,8 +2,9 @@
|
|||
import os
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
import yaml
|
||||
|
||||
import click
|
||||
import yaml
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
from bigquery_etl.cli.utils import use_cloud_function_option
|
||||
|
@ -41,11 +42,11 @@ def generate(target_project, output_dir, use_cloud_function):
|
|||
"""
|
||||
with open(THIS_PATH / "templates/templating.yaml", "r") as f:
|
||||
template_config = yaml.safe_load(f)
|
||||
|
||||
|
||||
output_dir = Path(output_dir) / target_project
|
||||
for query, args in template_config["queries"].items():
|
||||
template_query_dir = THIS_PATH / "templates" / query
|
||||
env = Environment(loader=FileSystemLoader(str(THIS_PATH / "templates"/query)))
|
||||
env = Environment(loader=FileSystemLoader(str(THIS_PATH / "templates" / query)))
|
||||
query_template = env.get_template("query.sql")
|
||||
metadata_template = "metadata.yaml"
|
||||
view_template = env.get_template("view.sql")
|
||||
|
|
|
@ -270,7 +270,9 @@ def write_view_if_not_exists(target_project: str, sql_dir: Path, schema: SchemaF
|
|||
try:
|
||||
content = VIEW_CREATE_REGEX.sub("", target_file.read_text())
|
||||
content += " WHERE DATE(submission_timestamp) = '2020-01-01'"
|
||||
view_schema = Schema.from_query_file(target_file, content=content, sql_dir=sql_dir)
|
||||
view_schema = Schema.from_query_file(
|
||||
target_file, content=content, sql_dir=sql_dir
|
||||
)
|
||||
|
||||
stable_table_schema = Schema.from_json({"fields": schema.schema})
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче