Update scalar metric SQL template

This commit is contained in:
Anna Scholtz 2022-03-10 13:49:23 -08:00
Родитель 9e2456066e
Коммит 227c1cea07
9 изменённых файлов: 159 добавлений и 134 удалений

Просмотреть файл

@ -30,6 +30,9 @@ class DataSource:
name: str
from_expression: str
submission_date_column: str
build_id_column: str
client_id_column: str
@attr.s(auto_attribs=True)

Просмотреть файл

@ -30,6 +30,7 @@ class BigQueryClient:
write_disposition: Optional[bigquery.job.WriteDisposition] = None,
clustering: Optional[List[str]] = None,
time_partitioning: Optional[str] = None,
partition_expiration_ms: Optional[int] = None,
) -> None:
dataset = bigquery.dataset.DatasetReference.from_string(
self.dataset,
@ -51,7 +52,12 @@ class BigQueryClient:
kwargs["clustering_fields"] = clustering
if time_partitioning:
kwargs["time_partitioning"] = bigquery.TimePartitioning(field="submission_date")
if partition_expiration_ms:
kwargs["time_partitioning"] = bigquery.TimePartitioning(
field=time_partitioning, expiration_ms=partition_expiration_ms
)
else:
kwargs["time_partitioning"] = bigquery.TimePartitioning(field=time_partitioning)
config = bigquery.job.QueryJobConfig(default_dataset=dataset, **kwargs)
job = self.client.query(query, config)

Просмотреть файл

@ -17,10 +17,19 @@ class DataSourceDefinition:
name: str # implicit in configuration
from_expression: str
submission_date_column: str = "submission_date"
build_id_column: str = "SAFE.SUBSTR(application.build_id, 0, 8)"
client_id_column: str = "client_id"
def resolve(self, spec: "MonitoringSpec") -> DataSource:
"""Create the `DataSource` representation."""
params: Dict[str, Any] = {"name": self.name, "from_expression": self.from_expression}
params: Dict[str, Any] = {
"name": self.name,
"from_expression": self.from_expression,
"submission_date_column": self.submission_date_column,
"build_id_column": self.build_id_column,
"client_id_column": self.client_id_column
}
return DataSource(**params)

Просмотреть файл

@ -9,7 +9,7 @@ from typing import Any, Dict, Optional
import attr
from jinja2 import Environment, FileSystemLoader
from . import Channel
from . import Channel, MonitoringPeriod
from .bigquery_client import BigQueryClient
from .config import MonitoringConfiguration
from .logging import LogConfiguration
@ -21,6 +21,7 @@ QUERY_FILENAME = "{}_query.sql"
VIEW_FILENAME = "{}_view.sql"
TEMPLATE_FOLDER = PATH / "templates"
DATA_TYPES = {"histogram", "scalar"} # todo: enum
DEFAULT_PARTITION_EXPIRATION = 432000000 # 5 days
# See https://github.com/mozilla/glam/issues/1575#issuecomment-946880387
# for reference of where these numbers come from.
@ -85,11 +86,24 @@ class Monitoring:
# if the data is for a build over build analysis but the entire table is
# replaced if it's a submission date analysis.
# group probes that are part of the same dataset
# necessary for creating the SQL template
probes_per_dataset = {}
for probe in self.config.probes:
if probe.data_source.name not in probes_per_dataset:
probes_per_dataset[probe.data_source.name] = [probe]
else:
probes_per_dataset[probe.data_source.name].append(probe)
render_kwargs = {
"header": "-- Generated via opmon\n",
"gcp_project": self.project,
"submission_date": submission_date,
"config": self.project,
"dataset": self.dataset,
"first_run": True, # todo: check if table exists
"dimensions": self.config.dimensions,
"branches": self.config.project.population.branches,
"channel": str(self.config.project.population.channel),
"user_count_threshold": USERS_PER_BUILD_THRESHOLDS[
@ -100,15 +114,23 @@ class Monitoring:
else None,
"xaxis": str(self.config.project.xaxis),
"start_date": self.config.project.start_date.strftime("%Y-%m-%d"),
"data_source": self.config.project.population.data_source,
"probes": probes,
"population_source": self.config.project.population.data_source,
"probes_per_dataset": probes_per_dataset,
"slug": self.slug,
}
partition_expiration_ms = None
if self.config.project.xaxis != MonitoringPeriod.DAY:
partition_expiration_ms = DEFAULT_PARTITION_EXPIRATION
sql_filename = QUERY_FILENAME.format(data_type)
sql = self._render_sql(sql_filename, render_kwargs)
self.bigquery.execute(
sql, destination_table, clustering=["build_id"], time_partitioning="submission_date"
sql,
destination_table,
clustering=["build_id"],
time_partitioning="submission_date",
partition_expiration_ms=partition_expiration_ms,
)
self._publish_view(data_type)
@ -125,6 +147,7 @@ class Monitoring:
"gcp_project": self.project,
"dataset": self.dataset,
"slug": self.slug,
"start_date": self.config.project.start_date.strftime("%Y-%m-%d"),
}
sql = self._render_sql(sql_filename, render_kwargs)
self.bigquery.execute(sql)

Просмотреть файл

@ -1,33 +0,0 @@
CREATE TABLE IF NOT EXISTS
`{{gcp_project}}.{{dataset}}.{{slug}}_histogram` (
submission_date DATE,
client_id STRING,
build_id STRING,
branch STRING,
{% for dimension in dimensions %}
{{ dimension.name }} STRING,
{% endfor %}
metrics ARRAY<
STRUCT<
name STRING,
histogram STRUCT<
bucket_count INT64,
sum INT64,
histogram_type INT64,
`range` ARRAY<INT64>,
VALUES
ARRAY<STRUCT<key STRING, value INT64>>
>
>
>)
PARTITION BY submission_date
CLUSTER BY
build_id
OPTIONS
(require_partition_filter = TRUE,
{% if xaxis == "submission_date" %}
partition_expiration_days = NULL
{% else %}
partition_expiration_days = 5
{% endif %}
)

Просмотреть файл

@ -1,24 +0,0 @@
CREATE TABLE IF NOT EXISTS
`{{gcp_project}}.{{dataset}}.{{slug}}_scalar` (
submission_date DATE,
client_id STRING,
build_id STRING,
branch STRING,
{% for dimension in dimensions %}
{{ dimension.name }} STRING,
{% endfor %}
name STRING,
agg_type STRING,
value FLOAT64
)
PARTITION BY submission_date
CLUSTER BY
build_id
OPTIONS
(require_partition_filter = TRUE,
{% if xaxis == "submission_date" %}
partition_expiration_days = NULL
{% else %}
partition_expiration_days = 5
{% endif %}
)

Просмотреть файл

@ -1,42 +1,52 @@
{{ header }}
WITH merged_scalars AS (
WITH population AS (
SELECT
{% if xaxis == "submission_date" %}
DATE(submission_timestamp) AS submission_date,
{% else %}
@submission_date AS submission_date,
{% endif %}
client_id,
SAFE.SUBSTR(application.build_id, 0, 8) AS build_id,
DATE({{ config.population.data_source.submission_date_column }}) AS submission_date,
{{ config.population.data_source.client_id_column }} AS client_id,
{{ config.population.data_source.build_id_column }} AS build_id,
{% for dimension in dimensions %}
CAST({{ dimension.sql }} AS STRING) AS {{ dimension.name }},
CAST({{ dimension.select_expression }} AS STRING) AS {{ dimension.name }},
{% endfor %}
-- If a pref is defined, treat it as a rollout with an enabled and disabled branch.
-- If branches are provided, use those instead.
-- If neither a pref or branches are available, use the slug and treat it as a rollout
-- where those with the slug have the feature enabled and those without do not.
{% if pref %}
CASE
WHEN SAFE_CAST({{pref}} as BOOLEAN) THEN 'enabled'
WHEN NOT SAFE_CAST({{pref}} as BOOLEAN) THEN 'disabled'
END
AS branch,
{% elif branches %}
{% if config.population.branches != [] %}
mozfun.map.get_key(
environment.experiments,
"{{slug}}"
"{{ slug }}"
).branch AS branch,
{% elif config.population.boolean_pref %}
CASE
WHEN SAFE_CAST({{ config.population.boolean_pref }} as BOOLEAN) THEN 'enabled'
WHEN NOT SAFE_CAST({{ config.population.boolean_pref }} as BOOLEAN) THEN 'disabled'
END
AS branch,
{% else %}
CASE WHEN
mozfun.map.get_key(
environment.experiments,
"{{slug}}"
).branch IS NULL THEN 'disabled'
ELSE 'enabled'
END AS branch,
NULL AS branch,
{% endif %}
FROM
`{{ config.population.data_source.from_expression }}`
WHERE
DATE({{ config.population.data_source.submission_date_column }}) = '{{ submission_date }}'
AND normalized_channel = '{{ config.population.channel }}'
GROUP BY
submission_date,
client_id,
build_id,
{% for dimension in dimensions %}
{{ dimension.name }},
{% endfor %}
branch
),
{% for data_source, probes in probes_per_dataset %}
merged_scalars_{{ data_source }} AS (
SELECT
DATE({{ config.population.data_source.submission_date_column }}) AS submission_date,
{{ config.population.data_source.client_id_column }} AS client_id,
ARRAY<
STRUCT<
name STRING,
@ -48,64 +58,65 @@ WITH merged_scalars AS (
(
"{{ probe.name }}",
"MAX",
MAX(CAST({{ probe.sql }} AS INT64))
MAX(CAST({{ probe.select_expression }} AS INT64))
),
(
"{{ probe.name }}",
"SUM",
SUM(CAST({{ probe.sql }} AS INT64))
SUM(CAST({{ probe.select_expression }} AS INT64))
)
{{ "," if not loop.last else "" }}
{% endfor %}
] AS metrics,
FROM
`{{source}}`
`{{ probes[0].data_source.from_expression }}`
WHERE
DATE(submission_timestamp) >= DATE_SUB(@submission_date, INTERVAL 60 DAY)
AND normalized_channel = '{{channel}}'
{{ config.population.data_source.submission_date_column }} = '{{ submission_date }}'
GROUP BY
submission_date,
client_id,
build_id,
{% for dimension in dimensions %}
{{ dimension.name }},
{% endfor %}
branch
client_id
),
{% endfor %}
joined_scalars AS (
SELECT
population.submission_date AS submission_date,
population.client_id AS client_id,
population,build_id,
{% for dimension in dimensions %}
population.{{ dimension.name }} AS {{ dimension.name }},
{% endfor %}
population.branch AS branch,
ARRAY_CONCAT(
{% for data_source, probes in probes_per_dataset %}
merged_scalars_{{ data_source }}.metrics
{% endfor %}
)
FROM population
{% for data_source, probes in probes_per_dataset %}
LEFT JOIN merged_scalars_{{ data_source }}
USING(submission_date, client_id)
{% endfor %}
),
flattened_scalars AS (
SELECT *
FROM merged_scalars
FROM joined_scalars
CROSS JOIN UNNEST(metrics)
{% if config.population.branches != [] or config.population.boolean_pref %}
WHERE branch IN (
-- If branches are not defined, assume it's a rollout
-- and fall back to branches labeled as enabled/disabled
{% if branches %}
{% for branch in branches %}
{% if config.population.branches != [] %}
{% for branch in config.population.branches %}
"{{ branch }}"
{{ "," if not loop.last else "" }}
{% endfor %}
{% else %}
{% elif config.population.boolean_pref %}
"enabled", "disabled"
{% endif %}
)
{% endif %}
),
log_min_max AS (
SELECT
name,
LOG(IF(MIN(value) <= 0, 1, MIN(value)), 2) log_min,
LOG(IF(MAX(value) <= 0, 1, MAX(value)), 2) log_max
FROM
flattened_scalars
GROUP BY 1),
buckets_by_metric AS (
SELECT name, ARRAY(SELECT FORMAT("%.*f", 2, bucket) FROM UNNEST(
mozfun.glam.histogram_generate_scalar_buckets(log_min, log_max, 100)
) AS bucket ORDER BY bucket) AS buckets
FROM log_min_max)
{% if first_run or str(config.xaxis) == "submission_date" %}
SELECT
submission_date,
client_id,
@ -116,9 +127,39 @@ SELECT
branch,
name,
agg_type,
-- Replace value with its bucket value
SAFE_CAST(FORMAT("%.*f", 2, COALESCE(mozfun.glam.histogram_bucket_from_value(buckets, SAFE_CAST(value AS FLOAT64)), 0) + 0.0001) AS FLOAT64) AS value
SAFE_CAST(value AS FLOAT64) AS value
FROM
flattened_scalars
LEFT JOIN buckets_by_metric USING(name)
{% else %}
-- if data is aggregated by build ID, then aggregate data with previous runs
SELECT
{{ submission_date }} AS submission_date,
IF(_current.client_id IS NOT NULL, _current, _prev).* REPLACE (
IF(_current.agg_type IS NOT NULL,
CASE _current.agg_type
WHEN "SUM" THEN SUM(SAFE_CAST(_current.value AS FLOAT64), _prev.value)
WHEN "MAX" THEN MAX(SAFE_CAST(_current.value AS FLOAT64), _prev.value)
ELSE SAFE_CAST(_current.value AS FLOAT64)
END,
CASE _prev.agg_type
WHEN "SUM" THEN SUM(SAFE_CAST(_prev.value AS FLOAT64), _prev.value)
WHEN "MAX" THEN MAX(SAFE_CAST(_prev.value AS FLOAT64), _prev.value)
ELSE SAFE_CAST(_prev.value AS FLOAT64)
END
) AS value
FROM
flattened_scalars _current
FULL JOIN
`{{ gcp_project }}.{{ dataset }}.{{ slug }}_scalar` _prev
ON
DATE_SUB(_prev.submission_date, INTERVAL 1 DAY) = _current.submission_date AND
_prev.client_id = _current.client_id AND
_prev.build_id = _current.build_id AND
{% for dimension in dimensions %}
_prev.{{ dimension.name }} = _current.{{ dimension.name }} AND
{% endfor %}
_prev.branch = _current.branch AND
_prev.name = _current.name AND
_prev.agg_type = _current.agg_type
WHERE _prev.submission_date = DATE_SUB('{{ submission_date }}', INTERVAL 1 DAY)
{% endif %}

Просмотреть файл

@ -1,25 +1,25 @@
CREATE OR REPLACE VIEW
`{{gcp_project}}.operational_monitoring.{{slug}}_scalar`
`{{ gcp_project }}.operational_monitoring.{{ slug }}_scalar`
AS
WITH valid_builds AS (
SELECT build_id
FROM `{{gcp_project}}.{{dataset}}.{{slug}}_scalar`
FROM `{{ gcp_project }}.{{ dataset }}.{{ slug }}_scalar`
WHERE {% include 'where_clause.sql' %}
GROUP BY 1
HAVING COUNT(DISTINCT client_id) >= {{user_count_threshold}}
HAVING COUNT(DISTINCT client_id) >= {{ user_count_threshold }}
),
filtered_scalars AS (
SELECT *
FROM valid_builds
INNER JOIN `{{gcp_project}}.{{dataset}}.{{slug}}_scalar`
INNER JOIN `{{ gcp_project }}.{{ dataset }}.{{ slug }}_scalar`
USING (build_id)
WHERE {% include 'where_clause.sql' %}
)
SELECT
client_id,
{% if xaxis == "submission_date" %}
{% if str(config.xaxis) == "submission_date" %}
submission_date,
{% else %}
build_id,
@ -37,7 +37,7 @@ SELECT
FROM filtered_scalars
GROUP BY
client_id,
{% if xaxis == "submission_date" %}
{% if str(config.xaxis) == "submission_date" %}
submission_date,
{% else %}
build_id,

Просмотреть файл

@ -1,12 +1,12 @@
{% if xaxis == "submission_date" %}
{% if start_date %}
DATE(submission_date) >= "{{start_date}}"
{% if str(config.xaxis) == "submission_date" %}
{% if config.start_date %}
DATE(submission_date) >= "{{ config.start_date }}"
{% else %}
DATE(submission_date) > DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
{% endif %}
{% else %}
{% if start_date %}
PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) >= "{{start_date}}"
{% if config.start_date %}
PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) >= "{{ config.start_date }}"
{% else %}
PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) > DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
{% endif %}