This commit is contained in:
Anna Scholtz 2022-09-07 13:19:30 -07:00
Родитель 6084fe2731
Коммит 13cdab45a6
11 изменённых файлов: 447 добавлений и 587 удалений

Просмотреть файл

@ -1,8 +1,9 @@
"""OpMon."""
import enum
from typing import List, Optional
from typing import List, Optional, TYPE_CHECKING
from opmon.config import Summary
if TYPE_CHECKING:
from opmon.config import Summary
import attr
@ -85,7 +86,7 @@ class Alert:
name: str
type: AlertType
probes: List[Summary]
probes: List["Summary"]
friendly_name: Optional[str] = None
description: Optional[str] = None
percentiles: List[int] = []

Просмотреть файл

@ -128,7 +128,7 @@ class ProbeDefinition:
description: Optional[str] = None
category: Optional[str] = None
type: Optional[str] = None
statistics: Optional[Dict[str, Dict[str, Any]]] = None
statistics: Optional[Dict[str, Dict[str, Any]]] = {"percentile": {}} # todo: remove default?
def resolve(self, spec: "MonitoringSpec") -> List[Summary]:
"""Create and return a `Probe` instance from this definition."""
@ -157,12 +157,13 @@ class ProbeDefinition:
stats_params = copy.deepcopy(params)
summaries.append(
Summary(
metric=probe,
statistic=statistic.from_dict(stats_params),
for stat in statistic.from_dict(stats_params).computation(probe):
summaries.append(
Summary(
metric=probe,
statistic=stat,
)
)
)
return summaries

Просмотреть файл

@ -31,3 +31,11 @@ class ConfigurationException(OpmonException):
def __init__(self, slug, message="Project has been incorrectly configured."):
"""Initialize exception."""
super().__init__(f"{slug} -> {message}")
class StatisticNotImplementedForTypeException(OpmonException):
"""Exception thrown when statistic is not implemented for metric type."""
def __init__(self, slug, message="Statistic not implemented for metric type."):
"""Initialize exception."""
super().__init__(f"{slug} -> {message}")

Просмотреть файл

@ -30,7 +30,12 @@ class ExternalConfig:
def validate(self, experiment: Optional[experimenter.Experiment] = None) -> None:
"""Validate the external config."""
conf = self.spec.resolve(experiment)
Monitoring(project="project", dataset="dataset", slug=self.slug, config=conf).validate()
Monitoring(
project="moz-fx-data-shared-prod",
dataset="operational_monitoring",
slug=self.slug,
config=conf,
).validate()
def entity_from_path(path: Path) -> ExternalConfig:

Просмотреть файл

@ -18,8 +18,7 @@ from .utils import bq_normalize_name
PATH = Path(os.path.dirname(__file__))
QUERY_FILENAME = "{}_query.sql"
VIEW_FILENAME = "metric_view.sql"
QUERY_FILENAME = "metric_query.sql"
ALERTS_FILENAME = "alerts_view.sql"
STATISTICS_FILENAME = "statistics.sql"
TEMPLATE_FOLDER = PATH / "templates"
@ -54,12 +53,8 @@ class Monitoring:
def run(self, submission_date):
"""Execute and generate the operational monitoring ETL for a specific date."""
for data_type in DATA_TYPES:
# Periodically print so airflow gke operator doesn't think task is dead
print(f"Run query for {self.slug} for {data_type} types")
self._run_sql_for_data_type(submission_date, data_type)
print(f"Create view for {self.slug}")
self.bigquery.execute(self._get_view_sql())
print(f"Run metrics query for {self.slug}")
self.bigquery.execute(self._run_metrics_sql(submission_date))
print("Calculate statistics")
self.bigquery.execute(self._get_statistics_sql(submission_date))
@ -68,7 +63,7 @@ class Monitoring:
self._run_sql_for_alerts(submission_date)
return True
def _run_sql_for_data_type(self, submission_date: datetime, data_type: str):
def _run_metrics_sql(self, submission_date: datetime):
"""Generate and execute the ETL for a specific data type."""
try:
self._check_runnable(submission_date)
@ -77,10 +72,10 @@ class Monitoring:
return
date_partition = str(submission_date).replace("-", "").split(" ")[0]
destination_table = f"{self.normalized_slug}_{data_type}${date_partition}"
destination_table = f"{self.normalized_slug}${date_partition}"
self.bigquery.execute(
self._get_data_type_sql(submission_date=submission_date, data_type=data_type),
self._get_metrics_sql(submission_date=submission_date),
destination_table,
clustering=["build_id"],
time_partitioning="submission_date",
@ -95,28 +90,19 @@ class Monitoring:
sql = template.render(**render_kwargs)
return sql
def _get_data_type_sql(
self, submission_date: datetime, data_type: str, first_run: Optional[bool] = None
def _get_metrics_sql(
self, submission_date: datetime, first_run: Optional[bool] = None
) -> str:
"""Return SQL for data_type ETL."""
probes = self.config.probes
probes = [probe for probe in probes if probe.metric.type == data_type]
if len(probes) == 0:
# There are no probes for this data source + data type combo
logger.warning(
f"No probes for data type {data_type} configured for {self.slug}.",
f"No metrics configured for {self.slug}.",
extra={"experiment": self.slug},
)
# todo:
# xaxis metadata to be used to decide whether the entire table is replaced
# Or just a partition.
#
# Note: there is a subtle design here in which date partitions are replaced
# if the data is for a build over build analysis but the entire table is
# replaced if it's a submission date analysis.
# group probes that are part of the same dataset
# necessary for creating the SQL template
metrics_per_dataset = {}
@ -124,13 +110,14 @@ class Monitoring:
if probe.metric.data_source.name not in metrics_per_dataset:
metrics_per_dataset[probe.metric.data_source.name] = [probe.metric]
else:
metrics_per_dataset[probe.metric.data_source.name].append(probe.metric)
if probe.metric not in metrics_per_dataset[probe.metric.data_source.name]:
metrics_per_dataset[probe.metric.data_source.name].append(probe.metric)
# check if this is the first time the queries are executed
# the queries are referencing the destination table if build_id is used for the time frame
if first_run is None:
destination_table = (
f"{self.project}.{self.dataset}_derived.{self.normalized_slug}_{data_type}"
f"{self.project}.{self.dataset}_derived.{self.normalized_slug}"
)
first_run = True
try:
@ -147,30 +134,15 @@ class Monitoring:
"dataset": self.dataset,
"first_run": first_run,
"dimensions": self.config.dimensions,
# "user_count_threshold": USERS_PER_BUILD_THRESHOLDS[
# self.config.project.population.channel
# ],
"metrics_per_dataset": metrics_per_dataset,
"slug": self.slug,
"normalized_slug": self.normalized_slug,
}
sql_filename = QUERY_FILENAME.format(data_type)
sql_filename = QUERY_FILENAME
sql = self._render_sql(sql_filename, render_kwargs)
return sql
def _get_view_sql(self) -> str:
"""Return the SQL to create a BigQuery view."""
render_kwargs = {
"gcp_project": self.project,
"dataset": self.dataset,
"config": self.config.project,
"normalized_slug": self.normalized_slug,
"dimensions": self.config.dimensions,
}
sql = self._render_sql(VIEW_FILENAME, render_kwargs)
return sql
def _get_statistics_sql(self, submission_date) -> str:
"""Return the SQL to run the statistics."""
render_kwargs = {
@ -179,7 +151,8 @@ class Monitoring:
"config": self.config.project,
"normalized_slug": self.normalized_slug,
"dimensions": self.config.dimensions,
"probes": self.config.probes,
"summaries": self.config.probes,
"submission_date": submission_date,
}
sql = self._render_sql(STATISTICS_FILENAME, render_kwargs)
return sql
@ -237,11 +210,18 @@ class Monitoring:
"""Validate ETL and configs of opmon project."""
self._check_runnable()
for data_type in DATA_TYPES:
data_type_sql = self._get_data_type_sql(
submission_date=self.config.project.start_date, # type: ignore
data_type=data_type,
first_run=True,
)
dry_run_query(data_type_sql)
print(data_type_sql)
metrics_sql = self._get_metrics_sql(
submission_date=self.config.project.start_date, # type: ignore
first_run=True,
)
dry_run_query(metrics_sql)
# print(data_type_sql)
statistics_sql = self._get_statistics_sql(
submission_date=self.config.project.start_date, # type: ignore
)
# print(statistics_sql)
dry_run_query(statistics_sql)
# todo: validate alerts
# todo: update alerts view/query

Просмотреть файл

@ -1,9 +1,12 @@
import re
from abc import ABC, abstractmethod
from abc import ABC
from typing import Any, Dict, List, Optional
import attr
from opmon import Probe
from opmon.errors import StatisticNotImplementedForTypeException
@attr.s(auto_attribs=True)
class StatisticComputation:
@ -31,9 +34,25 @@ class Statistic(ABC):
name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", cls.__name__)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower()
@abstractmethod
def computation(self, value: str = "values") -> List[StatisticComputation]:
return NotImplemented
def computation(self, metric: Probe) -> List[StatisticComputation]:
if metric.type == "scalar":
return self._scalar_computation(metric)
elif metric.type == "histogram":
return self._histogram_computation(metric)
else:
raise StatisticNotImplementedForTypeException(
f"Statistic {self.name()} not implemented for type {metric.type} ({metric.name})"
)
def _scalar_computation(self, metric: Probe) -> List[StatisticComputation]:
raise StatisticNotImplementedForTypeException(
f"Statistic {self.name()} not implemented for type {metric.type} ({metric.name})"
)
def _histogram_computation(self, metric: Probe) -> List[StatisticComputation]:
raise StatisticNotImplementedForTypeException(
f"Statistic {self.name()} not implemented for type {metric.type} ({metric.name})"
)
@classmethod
def from_dict(cls, config_dict: Dict[str, Any]):
@ -42,30 +61,30 @@ class Statistic(ABC):
class Count(Statistic):
def computation(self, value: str = "values"):
def _scalar_computation(self, metric: Probe):
return [
StatisticComputation(
point=f"COUNT({value})",
point=f"COUNT({metric.name})",
name=self.name(),
)
]
class Sum(Statistic):
def computation(self, value: str = "values"):
def _scalar_computation(self, metric: Probe):
return [
StatisticComputation(
point=f"SUM({value})",
point=f"SUM({metric.name})",
name=self.name(),
)
]
class Mean(Statistic):
def computation(self, value: str = "values"):
def _scalar_computation(self, metric: Probe):
return [
StatisticComputation(
point=f"AVG({value})",
point=f"AVG({metric.name})",
name=self.name(),
)
]
@ -75,11 +94,14 @@ class Quantile(Statistic):
number_of_quantiles: int = 100
quantile: int = 50
def computation(self, value: str = "values"):
def _scalar_computation(self, metric: Probe):
return [
StatisticComputation(
point=f"""
APPROX_QUANTILES({value}, {self.number_of_quantiles})[OFFSET({self.quantile})]
APPROX_QUANTILES(
{metric.name},
{self.number_of_quantiles}
)[OFFSET({self.quantile})]
""",
name=self.name(),
)
@ -90,30 +112,252 @@ class Quantile(Statistic):
class Percentile(Statistic):
percentiles: List[int] = [50, 90, 99]
def computation(self, value: str = "values"):
def _scalar_computation(self, metric: Probe):
return [
StatisticComputation(
point=f"""
`moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci(
{percentile},
STRUCT(
{value}
STRUCT<
bucket_count INT64,
sum INT64,
histogram_type INT64,
`range` ARRAY<INT64>,
VALUES
ARRAY<STRUCT<key FLOAT64, value FLOAT64>
>>(1,
COALESCE(
SAFE_CAST(
SAFE_CAST(
FORMAT(
"%.*f",
2,
COALESCE(
mozfun.glam.histogram_bucket_from_value(
{metric.name}_buckets,
SAFE_CAST({metric.name} AS FLOAT64)
), 0) + 0.0001
)
AS FLOAT64)
AS INT64),
0),
1,
[
0,
COALESCE(
SAFE_CAST(
SAFE_CAST(
FORMAT(
"%.*f",
2,
COALESCE(
mozfun.glam.histogram_bucket_from_value(
{metric.name}_buckets,
SAFE_CAST({metric.name} AS FLOAT64)
), 0
) + 0.0001
)
AS FLOAT64)
AS INT64),
0)
],
[
STRUCT<key FLOAT64, value FLOAT64>(
COALESCE(
SAFE_CAST(
FORMAT(
"%.*f",
2,
COALESCE(
mozfun.glam.histogram_bucket_from_value(
{metric.name}_buckets,
SAFE_CAST({metric.name} AS FLOAT64)
),
0) + 0.0001
) AS FLOAT64
), 0.0
), 1
)
]
)
).percentile
""",
lower=f"""
`moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci(
{percentile},
STRUCT(
{value}
STRUCT<
bucket_count INT64,
sum INT64,
histogram_type INT64,
`range` ARRAY<INT64>,
VALUES
ARRAY<STRUCT<key FLOAT64, value FLOAT64>
>>(1,
COALESCE(
SAFE_CAST(
SAFE_CAST(
FORMAT(
"%.*f",
2,
COALESCE(
mozfun.glam.histogram_bucket_from_value(
{metric.name}_buckets,
SAFE_CAST({metric.name} AS FLOAT64)
), 0) + 0.0001
)
AS FLOAT64)
AS INT64),
0),
1,
[
0,
COALESCE(
SAFE_CAST(
SAFE_CAST(
FORMAT(
"%.*f",
2,
COALESCE(
mozfun.glam.histogram_bucket_from_value(
{metric.name}_buckets,
SAFE_CAST({metric.name} AS FLOAT64)
), 0
) + 0.0001
)
AS FLOAT64)
AS INT64),
0)
],
[
STRUCT<key FLOAT64, value FLOAT64>(
COALESCE(
SAFE_CAST(
FORMAT(
"%.*f",
2,
COALESCE(
mozfun.glam.histogram_bucket_from_value(
{metric.name}_buckets,
SAFE_CAST({metric.name} AS FLOAT64)
),
0) + 0.0001
) AS FLOAT64
), 0.0
), 1
)
]
)
).low
""",
upper=f"""
`moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci(
{percentile},
STRUCT(
{value}
STRUCT<
bucket_count INT64,
sum INT64,
histogram_type INT64,
`range` ARRAY<INT64>,
VALUES
ARRAY<STRUCT<key FLOAT64, value FLOAT64>
>>(1,
COALESCE(
SAFE_CAST(
SAFE_CAST(
FORMAT(
"%.*f",
2,
COALESCE(
mozfun.glam.histogram_bucket_from_value(
{metric.name}_buckets,
SAFE_CAST({metric.name} AS FLOAT64)
), 0) + 0.0001
)
AS FLOAT64)
AS INT64),
0),
1,
[
0,
COALESCE(
SAFE_CAST(
SAFE_CAST(
FORMAT(
"%.*f",
2,
COALESCE(
mozfun.glam.histogram_bucket_from_value(
{metric.name}_buckets,
SAFE_CAST({metric.name} AS FLOAT64)
), 0
) + 0.0001
)
AS FLOAT64)
AS INT64),
0)
],
[
STRUCT<key FLOAT64, value FLOAT64>(
COALESCE(
SAFE_CAST(
FORMAT(
"%.*f",
2,
COALESCE(
mozfun.glam.histogram_bucket_from_value(
{metric.name}_buckets,
SAFE_CAST({metric.name} AS FLOAT64)
),
0) + 0.0001
) AS FLOAT64
), 0.0
), 1
)
]
)
).high
""",
name=self.name(),
parameter=str(percentile),
)
for percentile in self.percentiles
]
def _histogram_computation(self, metric: Probe) -> List[StatisticComputation]:
return [
StatisticComputation(
point=f"""
`moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci(
{percentile},
STRUCT(
histogram_normalized_sum(
mozfun.hist.merge(
ARRAY_AGG({metric.name} IGNORE NULLS)
).values, 1.0
)
)
).percentile
""",
lower=f"""
`moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci(
{percentile},
STRUCT(
histogram_normalized_sum(
mozfun.hist.merge(
ARRAY_AGG({metric.name} IGNORE NULLS)
).values, 1.0
)
)
).low
""",
upper=f"""
`moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci(
{percentile},
STRUCT(
histogram_normalized_sum(
mozfun.hist.merge(
ARRAY_AGG({metric.name} IGNORE NULLS)
).values, 1.0
)
)
).high
""",

Просмотреть файл

@ -1,229 +0,0 @@
{{ header }}
{% include 'population.sql' %},
-- for each data source that is used
-- select the metric values
{% for data_source, metrics in metrics_per_dataset.items() -%}
merged_metrics_{{ data_source }} AS (
SELECT
DATE({{ metrics[0].data_source.submission_date_column }}) AS submission_date,
{{ config.population.data_source.client_id_column }} AS client_id,
p.population_build_id AS build_id,
ARRAY<
STRUCT<
metric STRING,
histograms ARRAY<
STRUCT<
bucket_count INT64,
sum INT64,
histogram_type INT64,
`range` ARRAY<INT64>,
values ARRAY<STRUCT<key INT64, value INT64>>>
>>
>[
{% for metric in metrics %}
(
"{{ metric.name }}",
{{ metric.select_expression }}
)
{{ "," if not loop.last else "" }}
{% endfor %}
] AS metrics,
FROM
{{ metrics[0].data_source.from_expression }}
RIGHT JOIN
(
SELECT
client_id AS population_client_id,
submission_date AS population_submission_date,
build_id AS population_build_id
FROM
population
) AS p
ON
{{ metrics[0].data_source.submission_date_column }} = p.population_submission_date AND
{{ config.population.data_source.client_id_column }} = p.population_client_id
WHERE
{% if config.xaxis.value == "submission_date" %}
DATE({{ metrics[0].data_source.submission_date_column }}) = DATE('{{ submission_date }}')
{% else %}
-- when aggregating by build_id, only use the most recent 14 days of data
DATE({{ metrics[0].data_source.submission_date_column }}) BETWEEN DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 14 DAY) AND DATE('{{ submission_date }}')
{% endif %}
GROUP BY
submission_date,
client_id,
build_id
),
{% endfor %}
-- combine the metrics from all the data sources
joined_histograms AS (
SELECT
population.submission_date AS submission_date,
population.client_id AS client_id,
population.build_id,
{% for dimension in dimensions %}
population.{{ dimension.name }} AS {{ dimension.name }},
{% endfor %}
population.branch AS branch,
{% if metrics_per_dataset != {} %}
ARRAY_CONCAT(
{% for data_source, metrics in metrics_per_dataset.items() %}
merged_metrics_{{ data_source }}.metrics
{% endfor %}
) AS metrics
{% else %}
[] AS metrics,
{% endif %}
FROM population
{% for data_source, metrics in metrics_per_dataset.items() %}
LEFT JOIN merged_metrics_{{ data_source }}
USING(submission_date, client_id)
{% endfor %}
),
-- merge histograms if client has multiple
merged_histograms AS (
SELECT
submission_date,
client_id,
build_id,
branch,
{% for dimension in dimensions %}
{{ dimension.name }},
{% endfor %}
{% if metrics_per_dataset != {} %}
ARRAY_AGG(
STRUCT<
name STRING,
histogram STRUCT<
bucket_count INT64,
sum INT64,
histogram_type INT64,
`range` ARRAY<INT64>,
values ARRAY<STRUCT<key INT64, value INT64>>
>
> (
metric,
CASE
WHEN
histograms IS NULL
THEN
NULL
ELSE
mozfun.hist.merge(histograms)
END
)
) AS metrics
{% else %}
[] AS metrics
{% endif %}
FROM
joined_histograms
CROSS JOIN
UNNEST(metrics)
{% if not config.population.monitor_entire_population %}
WHERE branch IN (
-- If branches are not defined, assume it's a rollout
-- and fall back to branches labeled as enabled/disabled
{% if config.population.branches|length > 0 -%}
{% for branch in config.population.branches -%}
"{{ branch }}"
{{ "," if not loop.last else "" }}
{% endfor -%}
{% else -%}
"enabled", "disabled"
{% endif -%}
)
{% endif %}
GROUP BY
submission_date,
client_id,
build_id,
{% for dimension in dimensions %}
{{ dimension.name }},
{% endfor %}
branch
),
-- Cast histograms to have string keys so we can use the histogram normalization function
normalized_histograms AS (
SELECT
submission_date,
client_id,
build_id,
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor -%}
branch,
{% if metrics_per_dataset != {} %}
name AS metric,
{% else %}
NULL AS metric,
{% endif %}
{% if metrics_per_dataset != {} %}
STRUCT<
bucket_count INT64,
sum INT64,
histogram_type INT64,
`range` ARRAY<INT64>,
VALUES
ARRAY<STRUCT<key STRING, value INT64>>
>(histogram.bucket_count,
histogram.sum,
histogram.histogram_type,
histogram.range,
ARRAY(SELECT AS STRUCT CAST(keyval.key AS STRING), keyval.value FROM UNNEST(histogram.values) keyval)
) AS value
{% else %}
NULL AS value
{% endif %}
FROM merged_histograms
CROSS JOIN UNNEST(metrics)
)
{% if first_run or config.xaxis.value == "submission_date" -%}
SELECT
submission_date,
client_id,
build_id,
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor %}
branch,
metric AS probe,
value
FROM
normalized_histograms
{% else -%}
SELECT
DATE('{{ submission_date }}') AS submission_date,
client_id,
build_id,
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor %}
branch,
metric AS probe,
value
FROM normalized_histograms _current
WHERE
PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) >= DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 14 DAY)
UNION ALL
SELECT
DATE('{{ submission_date }}') AS submission_date,
client_id,
build_id,
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor %}
branch,
metric AS probe,
value
FROM normalized_histograms _prev
WHERE
PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) < DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 14 DAY)
AND submission_date = DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 1 DAY)
{% endif -%}

Просмотреть файл

@ -5,25 +5,14 @@
-- for each data source that is used
-- select the metric values
{% for data_source, metrics in metrics_per_dataset.items() -%}
merged_scalars_{{ data_source }} AS (
merged_metrics_{{ data_source }} AS (
SELECT
DATE({{ metrics[0].data_source.submission_date_column }}) AS submission_date,
{{ config.population.data_source.client_id_column }} AS client_id,
p.population_build_id AS build_id,
ARRAY<
STRUCT<
name STRING,
value FLOAT64
>
>[
{% for metric in metrics -%}
(
"{{ metric.name }}",
CAST({{ metric.select_expression }} AS FLOAT64)
)
{{ "," if not loop.last else "" }}
{% endfor -%}
] AS metrics,
{% for metric in metrics -%}
{{ metric.select_expression }} AS {{ metric.name }},
{% endfor -%}
FROM
{{ metrics[0].data_source.from_expression }}
RIGHT JOIN
@ -53,7 +42,7 @@ merged_scalars_{{ data_source }} AS (
{% endfor %}
-- combine the metrics from all the data sources
joined_scalars AS (
joined_metrics AS (
SELECT
population.submission_date AS submission_date,
population.client_id AS client_id,
@ -62,25 +51,23 @@ joined_scalars AS (
population.{{ dimension.name }} AS {{ dimension.name }},
{% endfor %}
population.branch AS branch,
ARRAY_CONCAT(
{% for data_source, metrics in metrics_per_dataset.items() -%}
COALESCE(merged_scalars_{{ data_source }}.metrics, [])
{{ "," if not loop.last else "" }}
{% endfor -%}
) AS metrics
{% for data_source, metrics in metrics_per_dataset.items() -%}
{% for metric in metrics -%}
{{ metric.name }},
{% endfor -%}
{% endfor -%}
FROM population
{% for data_source, metrics in metrics_per_dataset.items() -%}
LEFT JOIN merged_scalars_{{ data_source }}
LEFT JOIN merged_metrics_{{ data_source }}
USING(submission_date, client_id, build_id)
{% endfor %}
),
-- unnest the combined metrics so we get
-- the metric values for each client for each date
flattened_scalars AS (
SELECT * EXCEPT(metrics)
FROM joined_scalars
CROSS JOIN UNNEST(metrics)
-- normalize histograms and apply filters
normalized_metrics AS (
SELECT
*
FROM joined_metrics
{% if not config.population.monitor_entire_population %}
WHERE branch IN (
-- If branches are not defined, assume it's a rollout
@ -98,44 +85,20 @@ flattened_scalars AS (
)
{% if first_run or config.xaxis.value == "submission_date" -%}
SELECT
submission_date,
client_id,
build_id,
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor %}
branch,
name,
value
*
FROM
flattened_scalars
normalized_metrics
{% else -%}
-- if data is aggregated by build ID, then aggregate data with previous runs
SELECT
DATE('{{ submission_date }}') AS submission_date,
client_id,
build_id,
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor %}
branch,
name,
value
FROM flattened_scalars _current
*
FROM normalized_metrics _current
WHERE
PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) >= DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 14 DAY)
UNION ALL
SELECT
DATE('{{ submission_date }}') AS submission_date,
client_id,
build_id,
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor %}
branch,
name,
value
FROM flattened_scalars _prev
SELECT * REPLACE (DATE('{{ submission_date }}') AS submission_date)
FROM normalized_metrics _prev
WHERE
PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) < DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 14 DAY)
AND submission_date = DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 1 DAY)

Просмотреть файл

@ -1,167 +0,0 @@
{{ header }}
CREATE OR REPLACE VIEW
`{{ gcp_project }}.{{ dataset }}.{{ normalized_slug }}`
AS
-- Prepare scalar values
WITH filtered_scalars AS (
SELECT *
FROM `{{ gcp_project }}.{{ dataset }}_derived.{{ normalized_slug }}_scalar`
WHERE {% include 'where_clause.sql' -%}
),
log_min_max AS (
SELECT
name,
LOG(IF(MIN(value) <= 0, 1, MIN(value)), 2) log_min,
LOG(IF(MAX(value) <= 0, 1, MAX(value)), 2) log_max
FROM
filtered_scalars
GROUP BY name),
buckets_by_metric AS (
SELECT
name,
ARRAY(SELECT FORMAT("%.*f", 2, bucket) FROM UNNEST(
mozfun.glam.histogram_generate_scalar_buckets(log_min, log_max, 100)
) AS bucket ORDER BY bucket) AS buckets
FROM log_min_max
),
aggregated_scalars AS (
SELECT
client_id,
{% if config.xaxis.value == "submission_date" -%}
submission_date,
{% else %}
build_id,
{% endif %}
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor -%}
branch,
name,
value
FROM
filtered_scalars
),
-- Prepare histogram values
filtered_histograms AS (
SELECT *
FROM `{{ gcp_project }}.{{ dataset }}_derived.{{ normalized_slug }}_histogram`
WHERE {% include 'where_clause.sql' -%}
),
normalized_histograms AS (
SELECT
client_id,
{% if config.xaxis.value == "submission_date" -%}
submission_date,
{% else -%}
build_id,
{% endif -%}
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor -%}
branch,
probe,
{% if metrics_per_dataset != {} %}
STRUCT<
bucket_count INT64,
sum INT64,
histogram_type INT64,
`range` ARRAY<INT64>,
VALUES
ARRAY<STRUCT<key STRING, value FLOAT64>>
>(
ANY_VALUE(value.bucket_count),
ANY_VALUE(value.sum),
ANY_VALUE(value.histogram_type),
ANY_VALUE(value.range),
mozfun.glam.histogram_normalized_sum(
mozfun.hist.merge(ARRAY_AGG(value IGNORE NULLS)).values,
1.0
)
) AS value
{% else %}
NULL AS value
{% endif %}
FROM filtered_histograms
GROUP BY
client_id,
{% if config.xaxis.value == "submission_date" -%}
submission_date,
{% else -%}
build_id,
{% endif %}
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor -%}
branch,
probe)
-- Cast histograms to have FLOAT64 keys
-- so we can use the histogram jackknife percentile function.
SELECT
client_id,
{% if config.xaxis.value == "submission_date" -%}
submission_date,
{% else -%}
build_id,
{% endif %}
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor -%}
branch,
probe AS probe,
{% if metrics_per_dataset != {} %}
STRUCT<
bucket_count INT64,
sum INT64,
histogram_type INT64,
`range` ARRAY<INT64>,
VALUES
ARRAY<STRUCT<key FLOAT64, value FLOAT64>
>>(value.bucket_count,
value.sum,
value.histogram_type,
value.range,
ARRAY(SELECT AS STRUCT CAST(keyval.key AS FLOAT64), keyval.value FROM UNNEST(value.values) keyval)
) AS value
{% else %}
NULL AS value
{% endif %}
FROM normalized_histograms
UNION ALL
SELECT
client_id,
{% if config.xaxis.value == "submission_date" -%}
submission_date,
{% else %}
build_id,
{% endif %}
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor -%}
branch,
name AS probe,
STRUCT<
bucket_count INT64,
sum INT64,
histogram_type INT64,
`range` ARRAY<INT64>,
VALUES
ARRAY<STRUCT<key FLOAT64, value FLOAT64>
>>(1,
COALESCE(SAFE_CAST(SAFE_CAST(FORMAT("%.*f", 2, COALESCE(mozfun.glam.histogram_bucket_from_value(buckets, SAFE_CAST(value AS FLOAT64)), 0) + 0.0001) AS FLOAT64) AS INT64), 0),
1,
[0, COALESCE(SAFE_CAST(SAFE_CAST(FORMAT("%.*f", 2, COALESCE(mozfun.glam.histogram_bucket_from_value(buckets, SAFE_CAST(value AS FLOAT64)), 0) + 0.0001) AS FLOAT64) AS INT64), 0)],
[STRUCT<key FLOAT64, value FLOAT64>(
COALESCE(SAFE_CAST(FORMAT("%.*f", 2, COALESCE(mozfun.glam.histogram_bucket_from_value(buckets, SAFE_CAST(value AS FLOAT64)), 0) + 0.0001) AS FLOAT64), 0.0), 1
)]
) AS value
FROM
aggregated_scalars
LEFT JOIN buckets_by_metric USING(name)

Просмотреть файл

@ -0,0 +1,40 @@
CREATE TEMPORARY FUNCTION histogram_normalized_sum(
arrs ARRAY<STRUCT<key INT64, value INT64>>,
weight FLOAT64
)
RETURNS ARRAY<STRUCT<key INT64, value FLOAT64>> AS (
-- Input: one histogram for a single client.
-- Returns the normalized sum of the input maps.
-- It returns the total_count[k] / SUM(total_count)
-- for each key k.
(
WITH total_counts AS (
SELECT
sum(a.value) AS total_count
FROM
UNNEST(arrs) AS a
),
summed_counts AS (
SELECT
a.key AS k,
SUM(a.value) AS v
FROM
UNNEST(arrs) AS a
GROUP BY
a.key
)
SELECT
ARRAY_AGG(
STRUCT<key INT64, value FLOAT64>(
k,
COALESCE(SAFE_DIVIDE(1.0 * v, total_count), 0) * weight
)
ORDER BY
SAFE_CAST(k AS INT64)
)
FROM
summed_counts
CROSS JOIN
total_counts
)
);

Просмотреть файл

@ -1,4 +1,39 @@
WITH merged AS (
{{ header }}
{% include 'normalized_sum_udf.sql' %}
WITH filtered_metrics AS (
SELECT *
FROM `{{ gcp_project }}.{{ dataset }}_derived.{{ normalized_slug }}`
WHERE {% include 'where_clause.sql' -%}
),
-- bucket metrics that use percentile
buckets_by_metric AS (
SELECT
[] AS dummy,
{% set seen_metrics = [] %}
{% for summary in summaries %}
{% if summary.statistic.name == "percentile" %}
{% if summary.metric.type == "scalar" -%}
{% if summary.metric.name not in seen_metrics %}
{% if seen_metrics.append(summary.metric.name) %} {% endif %}
ARRAY(SELECT FORMAT("%.*f", 2, bucket) FROM UNNEST(
mozfun.glam.histogram_generate_scalar_buckets(
LOG(IF(MIN(value) <= 0, 1, MIN({{ summary.metric.name }})), 2),
LOG(IF(MAX(value) <= 0, 1, MAX({{ summary.metric.name }})), 2),
100
)
) AS bucket ORDER BY bucket) AS {{ summary.metric.name }}_buckets,
{% endif %}
{% endif %}
{% endif %}
{% endfor %}
FROM filtered_metrics
),
stats AS (
SELECT
{% if config.xaxis.value == "submission_date" -%}
submission_date,
@ -9,24 +44,37 @@ WITH merged AS (
{{ dimension.name }},
{% endfor -%}
branch,
probe AS metric,
mozfun.hist.merge(ARRAY_AGG(value IGNORE NULLS)).values AS values
ARRAY<STRUCT<
metric STRING,
statistic STRING,
point FLOAT64,
lower FLOAT64,
upper FLOAT64,
parameter STRING
>>[
{% for summary in summaries %}
STRUCT(
'{{ summary.metric.name }}' AS metric,
'{{ summary.statistic.name }}' AS statistic,
{{ summary.statistic.point }} AS point
{% if summary.statistic.lower -%}
,{{ summary.statistic.lower }} AS lower
{% endif -%}
{% if summary.statistic.upper -%}
,{{ summary.statistic.upper }} AS upper
{% endif -%}
{% if summary.statistic.parameter -%}
,'{{ summary.statistic.parameter }}' AS parameter
{% endif -%}
)
{{ "," if not loop.last else "" }}
{% endfor %}
] AS statistics
FROM
`{{ gcp_project }}.{{ dataset }}.{{ normalized_slug }}`
CROSS JOIN buckets_by_metric
WHERE submission_date = DATE("{{ submission_date }}")
GROUP BY
{% if config.xaxis.value == "submission_date" -%}
submission_date,
{% else %}
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor -%}
build_id,
{% endif %}
branch,
metric
), stats AS (
SELECT
{% if config.xaxis.value == "submission_date" -%}
submission_date,
{% else %}
@ -35,43 +83,9 @@ WITH merged AS (
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor -%}
branch,
metric,
CASE value
{% for probe in probes %}
WHEN probe = "{{ probe.metric.name }}"
THEN ARRAY<STRUCT<>>[(
{% for stat in probe.statistics %}
{{ stat.name }} AS statistic,
{{ stat.point }} AS point,
{% if stat.lower -%}
stat.lower AS lower,
{% endif -%}
{% if stat.upper -%}
stat.upper AS upper,
{% endif -%}
{% if stat.parameter -%}
stat.parameter AS parameter,
{% endif -%}
{% enfor %}
)]
{% endfor %}
ELSE NULL
END AS values
FROM
merged
GROUP BY
{% if config.xaxis.value == "submission_date" -%}
submission_date,
{% else %}
{% for dimension in dimensions -%}
{{ dimension.name }},
{% endfor -%}
build_id,
{% endif %}
branch,
metric
branch
)
SELECT
{% if config.xaxis.value == "submission_date" -%}
submission_date,
@ -82,10 +96,10 @@ SELECT
{{ dimension.name }},
{% endfor -%}
branch,
metric,
statistic.metric AS metric,
statistic.name AS statistic,
statistic.point AS point,
statistic.lower AS lower,
statistic.upper AS upper,
statistic.parameter AS parameter
FROM stats, UNNEST(values) as statistic
FROM stats, UNNEST(statistics) as statistic