From 13cdab45a6a5275dab9b28913fd3e302a1c92d6d Mon Sep 17 00:00:00 2001 From: Anna Scholtz Date: Wed, 7 Sep 2022 13:19:30 -0700 Subject: [PATCH] Support custom statistics --- opmon/__init__.py | 7 +- opmon/config.py | 13 +- opmon/errors.py | 8 + opmon/external_config.py | 7 +- opmon/monitoring.py | 80 ++--- opmon/statistic.py | 282 ++++++++++++++++-- opmon/templates/histogram_query.sql | 229 -------------- .../{scalar_query.sql => metric_query.sql} | 81 ++--- opmon/templates/metric_view.sql | 167 ----------- opmon/templates/normalized_sum_udf.sql | 40 +++ opmon/templates/statistics.sql | 120 ++++---- 11 files changed, 447 insertions(+), 587 deletions(-) delete mode 100644 opmon/templates/histogram_query.sql rename opmon/templates/{scalar_query.sql => metric_query.sql} (65%) delete mode 100644 opmon/templates/metric_view.sql create mode 100644 opmon/templates/normalized_sum_udf.sql diff --git a/opmon/__init__.py b/opmon/__init__.py index ce4b085..46ebcac 100644 --- a/opmon/__init__.py +++ b/opmon/__init__.py @@ -1,8 +1,9 @@ """OpMon.""" import enum -from typing import List, Optional +from typing import List, Optional, TYPE_CHECKING -from opmon.config import Summary +if TYPE_CHECKING: + from opmon.config import Summary import attr @@ -85,7 +86,7 @@ class Alert: name: str type: AlertType - probes: List[Summary] + probes: List["Summary"] friendly_name: Optional[str] = None description: Optional[str] = None percentiles: List[int] = [] diff --git a/opmon/config.py b/opmon/config.py index cd82b4b..fb825cb 100644 --- a/opmon/config.py +++ b/opmon/config.py @@ -128,7 +128,7 @@ class ProbeDefinition: description: Optional[str] = None category: Optional[str] = None type: Optional[str] = None - statistics: Optional[Dict[str, Dict[str, Any]]] = None + statistics: Optional[Dict[str, Dict[str, Any]]] = {"percentile": {}} # todo: remove default? def resolve(self, spec: "MonitoringSpec") -> List[Summary]: """Create and return a `Probe` instance from this definition.""" @@ -157,12 +157,13 @@ class ProbeDefinition: stats_params = copy.deepcopy(params) - summaries.append( - Summary( - metric=probe, - statistic=statistic.from_dict(stats_params), + for stat in statistic.from_dict(stats_params).computation(probe): + summaries.append( + Summary( + metric=probe, + statistic=stat, + ) ) - ) return summaries diff --git a/opmon/errors.py b/opmon/errors.py index c47fbd6..efd6ffb 100644 --- a/opmon/errors.py +++ b/opmon/errors.py @@ -31,3 +31,11 @@ class ConfigurationException(OpmonException): def __init__(self, slug, message="Project has been incorrectly configured."): """Initialize exception.""" super().__init__(f"{slug} -> {message}") + + +class StatisticNotImplementedForTypeException(OpmonException): + """Exception thrown when statistic is not implemented for metric type.""" + + def __init__(self, slug, message="Statistic not implemented for metric type."): + """Initialize exception.""" + super().__init__(f"{slug} -> {message}") diff --git a/opmon/external_config.py b/opmon/external_config.py index 725e0be..e8896a2 100644 --- a/opmon/external_config.py +++ b/opmon/external_config.py @@ -30,7 +30,12 @@ class ExternalConfig: def validate(self, experiment: Optional[experimenter.Experiment] = None) -> None: """Validate the external config.""" conf = self.spec.resolve(experiment) - Monitoring(project="project", dataset="dataset", slug=self.slug, config=conf).validate() + Monitoring( + project="moz-fx-data-shared-prod", + dataset="operational_monitoring", + slug=self.slug, + config=conf, + ).validate() def entity_from_path(path: Path) -> ExternalConfig: diff --git a/opmon/monitoring.py b/opmon/monitoring.py index 6cf2843..3f7e44a 100644 --- a/opmon/monitoring.py +++ b/opmon/monitoring.py @@ -18,8 +18,7 @@ from .utils import bq_normalize_name PATH = Path(os.path.dirname(__file__)) -QUERY_FILENAME = "{}_query.sql" -VIEW_FILENAME = "metric_view.sql" +QUERY_FILENAME = "metric_query.sql" ALERTS_FILENAME = "alerts_view.sql" STATISTICS_FILENAME = "statistics.sql" TEMPLATE_FOLDER = PATH / "templates" @@ -54,12 +53,8 @@ class Monitoring: def run(self, submission_date): """Execute and generate the operational monitoring ETL for a specific date.""" - for data_type in DATA_TYPES: - # Periodically print so airflow gke operator doesn't think task is dead - print(f"Run query for {self.slug} for {data_type} types") - self._run_sql_for_data_type(submission_date, data_type) - print(f"Create view for {self.slug}") - self.bigquery.execute(self._get_view_sql()) + print(f"Run metrics query for {self.slug}") + self.bigquery.execute(self._run_metrics_sql(submission_date)) print("Calculate statistics") self.bigquery.execute(self._get_statistics_sql(submission_date)) @@ -68,7 +63,7 @@ class Monitoring: self._run_sql_for_alerts(submission_date) return True - def _run_sql_for_data_type(self, submission_date: datetime, data_type: str): + def _run_metrics_sql(self, submission_date: datetime): """Generate and execute the ETL for a specific data type.""" try: self._check_runnable(submission_date) @@ -77,10 +72,10 @@ class Monitoring: return date_partition = str(submission_date).replace("-", "").split(" ")[0] - destination_table = f"{self.normalized_slug}_{data_type}${date_partition}" + destination_table = f"{self.normalized_slug}${date_partition}" self.bigquery.execute( - self._get_data_type_sql(submission_date=submission_date, data_type=data_type), + self._get_metrics_sql(submission_date=submission_date), destination_table, clustering=["build_id"], time_partitioning="submission_date", @@ -95,28 +90,19 @@ class Monitoring: sql = template.render(**render_kwargs) return sql - def _get_data_type_sql( - self, submission_date: datetime, data_type: str, first_run: Optional[bool] = None + def _get_metrics_sql( + self, submission_date: datetime, first_run: Optional[bool] = None ) -> str: """Return SQL for data_type ETL.""" probes = self.config.probes - probes = [probe for probe in probes if probe.metric.type == data_type] if len(probes) == 0: # There are no probes for this data source + data type combo logger.warning( - f"No probes for data type {data_type} configured for {self.slug}.", + f"No metrics configured for {self.slug}.", extra={"experiment": self.slug}, ) - # todo: - # xaxis metadata to be used to decide whether the entire table is replaced - # Or just a partition. - # - # Note: there is a subtle design here in which date partitions are replaced - # if the data is for a build over build analysis but the entire table is - # replaced if it's a submission date analysis. - # group probes that are part of the same dataset # necessary for creating the SQL template metrics_per_dataset = {} @@ -124,13 +110,14 @@ class Monitoring: if probe.metric.data_source.name not in metrics_per_dataset: metrics_per_dataset[probe.metric.data_source.name] = [probe.metric] else: - metrics_per_dataset[probe.metric.data_source.name].append(probe.metric) + if probe.metric not in metrics_per_dataset[probe.metric.data_source.name]: + metrics_per_dataset[probe.metric.data_source.name].append(probe.metric) # check if this is the first time the queries are executed # the queries are referencing the destination table if build_id is used for the time frame if first_run is None: destination_table = ( - f"{self.project}.{self.dataset}_derived.{self.normalized_slug}_{data_type}" + f"{self.project}.{self.dataset}_derived.{self.normalized_slug}" ) first_run = True try: @@ -147,30 +134,15 @@ class Monitoring: "dataset": self.dataset, "first_run": first_run, "dimensions": self.config.dimensions, - # "user_count_threshold": USERS_PER_BUILD_THRESHOLDS[ - # self.config.project.population.channel - # ], "metrics_per_dataset": metrics_per_dataset, "slug": self.slug, "normalized_slug": self.normalized_slug, } - sql_filename = QUERY_FILENAME.format(data_type) + sql_filename = QUERY_FILENAME sql = self._render_sql(sql_filename, render_kwargs) return sql - def _get_view_sql(self) -> str: - """Return the SQL to create a BigQuery view.""" - render_kwargs = { - "gcp_project": self.project, - "dataset": self.dataset, - "config": self.config.project, - "normalized_slug": self.normalized_slug, - "dimensions": self.config.dimensions, - } - sql = self._render_sql(VIEW_FILENAME, render_kwargs) - return sql - def _get_statistics_sql(self, submission_date) -> str: """Return the SQL to run the statistics.""" render_kwargs = { @@ -179,7 +151,8 @@ class Monitoring: "config": self.config.project, "normalized_slug": self.normalized_slug, "dimensions": self.config.dimensions, - "probes": self.config.probes, + "summaries": self.config.probes, + "submission_date": submission_date, } sql = self._render_sql(STATISTICS_FILENAME, render_kwargs) return sql @@ -237,11 +210,18 @@ class Monitoring: """Validate ETL and configs of opmon project.""" self._check_runnable() - for data_type in DATA_TYPES: - data_type_sql = self._get_data_type_sql( - submission_date=self.config.project.start_date, # type: ignore - data_type=data_type, - first_run=True, - ) - dry_run_query(data_type_sql) - print(data_type_sql) + metrics_sql = self._get_metrics_sql( + submission_date=self.config.project.start_date, # type: ignore + first_run=True, + ) + dry_run_query(metrics_sql) + # print(data_type_sql) + + statistics_sql = self._get_statistics_sql( + submission_date=self.config.project.start_date, # type: ignore + ) + # print(statistics_sql) + dry_run_query(statistics_sql) + + # todo: validate alerts + # todo: update alerts view/query diff --git a/opmon/statistic.py b/opmon/statistic.py index 61a3e39..e682d0a 100644 --- a/opmon/statistic.py +++ b/opmon/statistic.py @@ -1,9 +1,12 @@ import re -from abc import ABC, abstractmethod +from abc import ABC from typing import Any, Dict, List, Optional import attr +from opmon import Probe +from opmon.errors import StatisticNotImplementedForTypeException + @attr.s(auto_attribs=True) class StatisticComputation: @@ -31,9 +34,25 @@ class Statistic(ABC): name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", cls.__name__) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower() - @abstractmethod - def computation(self, value: str = "values") -> List[StatisticComputation]: - return NotImplemented + def computation(self, metric: Probe) -> List[StatisticComputation]: + if metric.type == "scalar": + return self._scalar_computation(metric) + elif metric.type == "histogram": + return self._histogram_computation(metric) + else: + raise StatisticNotImplementedForTypeException( + f"Statistic {self.name()} not implemented for type {metric.type} ({metric.name})" + ) + + def _scalar_computation(self, metric: Probe) -> List[StatisticComputation]: + raise StatisticNotImplementedForTypeException( + f"Statistic {self.name()} not implemented for type {metric.type} ({metric.name})" + ) + + def _histogram_computation(self, metric: Probe) -> List[StatisticComputation]: + raise StatisticNotImplementedForTypeException( + f"Statistic {self.name()} not implemented for type {metric.type} ({metric.name})" + ) @classmethod def from_dict(cls, config_dict: Dict[str, Any]): @@ -42,30 +61,30 @@ class Statistic(ABC): class Count(Statistic): - def computation(self, value: str = "values"): + def _scalar_computation(self, metric: Probe): return [ StatisticComputation( - point=f"COUNT({value})", + point=f"COUNT({metric.name})", name=self.name(), ) ] class Sum(Statistic): - def computation(self, value: str = "values"): + def _scalar_computation(self, metric: Probe): return [ StatisticComputation( - point=f"SUM({value})", + point=f"SUM({metric.name})", name=self.name(), ) ] class Mean(Statistic): - def computation(self, value: str = "values"): + def _scalar_computation(self, metric: Probe): return [ StatisticComputation( - point=f"AVG({value})", + point=f"AVG({metric.name})", name=self.name(), ) ] @@ -75,11 +94,14 @@ class Quantile(Statistic): number_of_quantiles: int = 100 quantile: int = 50 - def computation(self, value: str = "values"): + def _scalar_computation(self, metric: Probe): return [ StatisticComputation( point=f""" - APPROX_QUANTILES({value}, {self.number_of_quantiles})[OFFSET({self.quantile})] + APPROX_QUANTILES( + {metric.name}, + {self.number_of_quantiles} + )[OFFSET({self.quantile})] """, name=self.name(), ) @@ -90,30 +112,252 @@ class Quantile(Statistic): class Percentile(Statistic): percentiles: List[int] = [50, 90, 99] - def computation(self, value: str = "values"): + def _scalar_computation(self, metric: Probe): return [ StatisticComputation( point=f""" `moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci( {percentile}, - STRUCT( - {value} + STRUCT< + bucket_count INT64, + sum INT64, + histogram_type INT64, + `range` ARRAY, + VALUES + ARRAY + >>(1, + COALESCE( + SAFE_CAST( + SAFE_CAST( + FORMAT( + "%.*f", + 2, + COALESCE( + mozfun.glam.histogram_bucket_from_value( + {metric.name}_buckets, + SAFE_CAST({metric.name} AS FLOAT64) + ), 0) + 0.0001 + ) + AS FLOAT64) + AS INT64), + 0), + 1, + [ + 0, + COALESCE( + SAFE_CAST( + SAFE_CAST( + FORMAT( + "%.*f", + 2, + COALESCE( + mozfun.glam.histogram_bucket_from_value( + {metric.name}_buckets, + SAFE_CAST({metric.name} AS FLOAT64) + ), 0 + ) + 0.0001 + ) + AS FLOAT64) + AS INT64), + 0) + ], + [ + STRUCT( + COALESCE( + SAFE_CAST( + FORMAT( + "%.*f", + 2, + COALESCE( + mozfun.glam.histogram_bucket_from_value( + {metric.name}_buckets, + SAFE_CAST({metric.name} AS FLOAT64) + ), + 0) + 0.0001 + ) AS FLOAT64 + ), 0.0 + ), 1 + ) + ] ) ).percentile """, lower=f""" `moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci( {percentile}, - STRUCT( - {value} + STRUCT< + bucket_count INT64, + sum INT64, + histogram_type INT64, + `range` ARRAY, + VALUES + ARRAY + >>(1, + COALESCE( + SAFE_CAST( + SAFE_CAST( + FORMAT( + "%.*f", + 2, + COALESCE( + mozfun.glam.histogram_bucket_from_value( + {metric.name}_buckets, + SAFE_CAST({metric.name} AS FLOAT64) + ), 0) + 0.0001 + ) + AS FLOAT64) + AS INT64), + 0), + 1, + [ + 0, + COALESCE( + SAFE_CAST( + SAFE_CAST( + FORMAT( + "%.*f", + 2, + COALESCE( + mozfun.glam.histogram_bucket_from_value( + {metric.name}_buckets, + SAFE_CAST({metric.name} AS FLOAT64) + ), 0 + ) + 0.0001 + ) + AS FLOAT64) + AS INT64), + 0) + ], + [ + STRUCT( + COALESCE( + SAFE_CAST( + FORMAT( + "%.*f", + 2, + COALESCE( + mozfun.glam.histogram_bucket_from_value( + {metric.name}_buckets, + SAFE_CAST({metric.name} AS FLOAT64) + ), + 0) + 0.0001 + ) AS FLOAT64 + ), 0.0 + ), 1 + ) + ] ) ).low """, upper=f""" `moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci( {percentile}, - STRUCT( - {value} + STRUCT< + bucket_count INT64, + sum INT64, + histogram_type INT64, + `range` ARRAY, + VALUES + ARRAY + >>(1, + COALESCE( + SAFE_CAST( + SAFE_CAST( + FORMAT( + "%.*f", + 2, + COALESCE( + mozfun.glam.histogram_bucket_from_value( + {metric.name}_buckets, + SAFE_CAST({metric.name} AS FLOAT64) + ), 0) + 0.0001 + ) + AS FLOAT64) + AS INT64), + 0), + 1, + [ + 0, + COALESCE( + SAFE_CAST( + SAFE_CAST( + FORMAT( + "%.*f", + 2, + COALESCE( + mozfun.glam.histogram_bucket_from_value( + {metric.name}_buckets, + SAFE_CAST({metric.name} AS FLOAT64) + ), 0 + ) + 0.0001 + ) + AS FLOAT64) + AS INT64), + 0) + ], + [ + STRUCT( + COALESCE( + SAFE_CAST( + FORMAT( + "%.*f", + 2, + COALESCE( + mozfun.glam.histogram_bucket_from_value( + {metric.name}_buckets, + SAFE_CAST({metric.name} AS FLOAT64) + ), + 0) + 0.0001 + ) AS FLOAT64 + ), 0.0 + ), 1 + ) + ] + ) + ).high + """, + name=self.name(), + parameter=str(percentile), + ) + for percentile in self.percentiles + ] + + def _histogram_computation(self, metric: Probe) -> List[StatisticComputation]: + return [ + StatisticComputation( + point=f""" + `moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci( + {percentile}, + STRUCT( + histogram_normalized_sum( + mozfun.hist.merge( + ARRAY_AGG({metric.name} IGNORE NULLS) + ).values, 1.0 + ) + ) + ).percentile + """, + lower=f""" + `moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci( + {percentile}, + STRUCT( + histogram_normalized_sum( + mozfun.hist.merge( + ARRAY_AGG({metric.name} IGNORE NULLS) + ).values, 1.0 + ) + ) + ).low + """, + upper=f""" + `moz-fx-data-shared-prod`.udf_js.jackknife_percentile_ci( + {percentile}, + STRUCT( + histogram_normalized_sum( + mozfun.hist.merge( + ARRAY_AGG({metric.name} IGNORE NULLS) + ).values, 1.0 + ) ) ).high """, diff --git a/opmon/templates/histogram_query.sql b/opmon/templates/histogram_query.sql deleted file mode 100644 index 3761d14..0000000 --- a/opmon/templates/histogram_query.sql +++ /dev/null @@ -1,229 +0,0 @@ -{{ header }} - -{% include 'population.sql' %}, - --- for each data source that is used --- select the metric values -{% for data_source, metrics in metrics_per_dataset.items() -%} -merged_metrics_{{ data_source }} AS ( - SELECT - DATE({{ metrics[0].data_source.submission_date_column }}) AS submission_date, - {{ config.population.data_source.client_id_column }} AS client_id, - p.population_build_id AS build_id, - ARRAY< - STRUCT< - metric STRING, - histograms ARRAY< - STRUCT< - bucket_count INT64, - sum INT64, - histogram_type INT64, - `range` ARRAY, - values ARRAY>> - >> - >[ - {% for metric in metrics %} - ( - "{{ metric.name }}", - {{ metric.select_expression }} - ) - {{ "," if not loop.last else "" }} - {% endfor %} - ] AS metrics, - FROM - {{ metrics[0].data_source.from_expression }} - RIGHT JOIN - ( - SELECT - client_id AS population_client_id, - submission_date AS population_submission_date, - build_id AS population_build_id - FROM - population - ) AS p - ON - {{ metrics[0].data_source.submission_date_column }} = p.population_submission_date AND - {{ config.population.data_source.client_id_column }} = p.population_client_id - WHERE - {% if config.xaxis.value == "submission_date" %} - DATE({{ metrics[0].data_source.submission_date_column }}) = DATE('{{ submission_date }}') - {% else %} - -- when aggregating by build_id, only use the most recent 14 days of data - DATE({{ metrics[0].data_source.submission_date_column }}) BETWEEN DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 14 DAY) AND DATE('{{ submission_date }}') - {% endif %} - GROUP BY - submission_date, - client_id, - build_id -), -{% endfor %} - --- combine the metrics from all the data sources -joined_histograms AS ( - SELECT - population.submission_date AS submission_date, - population.client_id AS client_id, - population.build_id, - {% for dimension in dimensions %} - population.{{ dimension.name }} AS {{ dimension.name }}, - {% endfor %} - population.branch AS branch, - {% if metrics_per_dataset != {} %} - ARRAY_CONCAT( - {% for data_source, metrics in metrics_per_dataset.items() %} - merged_metrics_{{ data_source }}.metrics - {% endfor %} - ) AS metrics - {% else %} - [] AS metrics, - {% endif %} - FROM population - {% for data_source, metrics in metrics_per_dataset.items() %} - LEFT JOIN merged_metrics_{{ data_source }} - USING(submission_date, client_id) - {% endfor %} -), - --- merge histograms if client has multiple -merged_histograms AS ( - SELECT - submission_date, - client_id, - build_id, - branch, - {% for dimension in dimensions %} - {{ dimension.name }}, - {% endfor %} - {% if metrics_per_dataset != {} %} - ARRAY_AGG( - STRUCT< - name STRING, - histogram STRUCT< - bucket_count INT64, - sum INT64, - histogram_type INT64, - `range` ARRAY, - values ARRAY> - > - > ( - metric, - CASE - WHEN - histograms IS NULL - THEN - NULL - ELSE - mozfun.hist.merge(histograms) - END - ) - ) AS metrics - {% else %} - [] AS metrics - {% endif %} - FROM - joined_histograms - CROSS JOIN - UNNEST(metrics) - {% if not config.population.monitor_entire_population %} - WHERE branch IN ( - -- If branches are not defined, assume it's a rollout - -- and fall back to branches labeled as enabled/disabled - {% if config.population.branches|length > 0 -%} - {% for branch in config.population.branches -%} - "{{ branch }}" - {{ "," if not loop.last else "" }} - {% endfor -%} - {% else -%} - "enabled", "disabled" - {% endif -%} - ) - {% endif %} - GROUP BY - submission_date, - client_id, - build_id, - {% for dimension in dimensions %} - {{ dimension.name }}, - {% endfor %} - branch -), - --- Cast histograms to have string keys so we can use the histogram normalization function -normalized_histograms AS ( - SELECT - submission_date, - client_id, - build_id, - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor -%} - branch, - {% if metrics_per_dataset != {} %} - name AS metric, - {% else %} - NULL AS metric, - {% endif %} - {% if metrics_per_dataset != {} %} - STRUCT< - bucket_count INT64, - sum INT64, - histogram_type INT64, - `range` ARRAY, - VALUES - ARRAY> - >(histogram.bucket_count, - histogram.sum, - histogram.histogram_type, - histogram.range, - ARRAY(SELECT AS STRUCT CAST(keyval.key AS STRING), keyval.value FROM UNNEST(histogram.values) keyval) - ) AS value - {% else %} - NULL AS value - {% endif %} - FROM merged_histograms - CROSS JOIN UNNEST(metrics) -) - -{% if first_run or config.xaxis.value == "submission_date" -%} -SELECT - submission_date, - client_id, - build_id, - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor %} - branch, - metric AS probe, - value -FROM - normalized_histograms -{% else -%} -SELECT - DATE('{{ submission_date }}') AS submission_date, - client_id, - build_id, - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor %} - branch, - metric AS probe, - value -FROM normalized_histograms _current -WHERE - PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) >= DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 14 DAY) -UNION ALL -SELECT - DATE('{{ submission_date }}') AS submission_date, - client_id, - build_id, - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor %} - branch, - metric AS probe, - value -FROM normalized_histograms _prev -WHERE - PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) < DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 14 DAY) - AND submission_date = DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 1 DAY) -{% endif -%} diff --git a/opmon/templates/scalar_query.sql b/opmon/templates/metric_query.sql similarity index 65% rename from opmon/templates/scalar_query.sql rename to opmon/templates/metric_query.sql index 3c8f1de..58e28f4 100644 --- a/opmon/templates/scalar_query.sql +++ b/opmon/templates/metric_query.sql @@ -5,25 +5,14 @@ -- for each data source that is used -- select the metric values {% for data_source, metrics in metrics_per_dataset.items() -%} -merged_scalars_{{ data_source }} AS ( +merged_metrics_{{ data_source }} AS ( SELECT DATE({{ metrics[0].data_source.submission_date_column }}) AS submission_date, {{ config.population.data_source.client_id_column }} AS client_id, p.population_build_id AS build_id, - ARRAY< - STRUCT< - name STRING, - value FLOAT64 - > - >[ - {% for metric in metrics -%} - ( - "{{ metric.name }}", - CAST({{ metric.select_expression }} AS FLOAT64) - ) - {{ "," if not loop.last else "" }} - {% endfor -%} - ] AS metrics, + {% for metric in metrics -%} + {{ metric.select_expression }} AS {{ metric.name }}, + {% endfor -%} FROM {{ metrics[0].data_source.from_expression }} RIGHT JOIN @@ -53,7 +42,7 @@ merged_scalars_{{ data_source }} AS ( {% endfor %} -- combine the metrics from all the data sources -joined_scalars AS ( +joined_metrics AS ( SELECT population.submission_date AS submission_date, population.client_id AS client_id, @@ -62,25 +51,23 @@ joined_scalars AS ( population.{{ dimension.name }} AS {{ dimension.name }}, {% endfor %} population.branch AS branch, - ARRAY_CONCAT( - {% for data_source, metrics in metrics_per_dataset.items() -%} - COALESCE(merged_scalars_{{ data_source }}.metrics, []) - {{ "," if not loop.last else "" }} - {% endfor -%} - ) AS metrics + {% for data_source, metrics in metrics_per_dataset.items() -%} + {% for metric in metrics -%} + {{ metric.name }}, + {% endfor -%} + {% endfor -%} FROM population {% for data_source, metrics in metrics_per_dataset.items() -%} - LEFT JOIN merged_scalars_{{ data_source }} + LEFT JOIN merged_metrics_{{ data_source }} USING(submission_date, client_id, build_id) {% endfor %} ), --- unnest the combined metrics so we get --- the metric values for each client for each date -flattened_scalars AS ( - SELECT * EXCEPT(metrics) - FROM joined_scalars - CROSS JOIN UNNEST(metrics) +-- normalize histograms and apply filters +normalized_metrics AS ( + SELECT + * + FROM joined_metrics {% if not config.population.monitor_entire_population %} WHERE branch IN ( -- If branches are not defined, assume it's a rollout @@ -98,44 +85,20 @@ flattened_scalars AS ( ) {% if first_run or config.xaxis.value == "submission_date" -%} SELECT - submission_date, - client_id, - build_id, - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor %} - branch, - name, - value + * FROM - flattened_scalars + normalized_metrics {% else -%} -- if data is aggregated by build ID, then aggregate data with previous runs SELECT - DATE('{{ submission_date }}') AS submission_date, - client_id, - build_id, - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor %} - branch, - name, - value -FROM flattened_scalars _current + * +FROM normalized_metrics _current WHERE PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) >= DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 14 DAY) UNION ALL SELECT - DATE('{{ submission_date }}') AS submission_date, - client_id, - build_id, - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor %} - branch, - name, - value -FROM flattened_scalars _prev + SELECT * REPLACE (DATE('{{ submission_date }}') AS submission_date) +FROM normalized_metrics _prev WHERE PARSE_DATE('%Y%m%d', CAST(build_id AS STRING)) < DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 14 DAY) AND submission_date = DATE_SUB(DATE('{{ submission_date }}'), INTERVAL 1 DAY) diff --git a/opmon/templates/metric_view.sql b/opmon/templates/metric_view.sql deleted file mode 100644 index eef20c8..0000000 --- a/opmon/templates/metric_view.sql +++ /dev/null @@ -1,167 +0,0 @@ -{{ header }} - -CREATE OR REPLACE VIEW - `{{ gcp_project }}.{{ dataset }}.{{ normalized_slug }}` -AS --- Prepare scalar values - -WITH filtered_scalars AS ( - SELECT * - FROM `{{ gcp_project }}.{{ dataset }}_derived.{{ normalized_slug }}_scalar` - WHERE {% include 'where_clause.sql' -%} -), - -log_min_max AS ( - SELECT - name, - LOG(IF(MIN(value) <= 0, 1, MIN(value)), 2) log_min, - LOG(IF(MAX(value) <= 0, 1, MAX(value)), 2) log_max - FROM - filtered_scalars - GROUP BY name), - -buckets_by_metric AS ( - SELECT - name, - ARRAY(SELECT FORMAT("%.*f", 2, bucket) FROM UNNEST( - mozfun.glam.histogram_generate_scalar_buckets(log_min, log_max, 100) - ) AS bucket ORDER BY bucket) AS buckets - FROM log_min_max -), - -aggregated_scalars AS ( - SELECT - client_id, - {% if config.xaxis.value == "submission_date" -%} - submission_date, - {% else %} - build_id, - {% endif %} - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor -%} - branch, - name, - value - FROM - filtered_scalars -), - --- Prepare histogram values -filtered_histograms AS ( - SELECT * - FROM `{{ gcp_project }}.{{ dataset }}_derived.{{ normalized_slug }}_histogram` - WHERE {% include 'where_clause.sql' -%} -), - -normalized_histograms AS ( - SELECT - client_id, - {% if config.xaxis.value == "submission_date" -%} - submission_date, - {% else -%} - build_id, - {% endif -%} - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor -%} - branch, - probe, - {% if metrics_per_dataset != {} %} - STRUCT< - bucket_count INT64, - sum INT64, - histogram_type INT64, - `range` ARRAY, - VALUES - ARRAY> - >( - ANY_VALUE(value.bucket_count), - ANY_VALUE(value.sum), - ANY_VALUE(value.histogram_type), - ANY_VALUE(value.range), - mozfun.glam.histogram_normalized_sum( - mozfun.hist.merge(ARRAY_AGG(value IGNORE NULLS)).values, - 1.0 - ) - ) AS value - {% else %} - NULL AS value - {% endif %} - FROM filtered_histograms - GROUP BY - client_id, - {% if config.xaxis.value == "submission_date" -%} - submission_date, - {% else -%} - build_id, - {% endif %} - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor -%} - branch, - probe) - --- Cast histograms to have FLOAT64 keys --- so we can use the histogram jackknife percentile function. -SELECT - client_id, - {% if config.xaxis.value == "submission_date" -%} - submission_date, - {% else -%} - build_id, - {% endif %} - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor -%} - branch, - probe AS probe, - {% if metrics_per_dataset != {} %} - STRUCT< - bucket_count INT64, - sum INT64, - histogram_type INT64, - `range` ARRAY, - VALUES - ARRAY - >>(value.bucket_count, - value.sum, - value.histogram_type, - value.range, - ARRAY(SELECT AS STRUCT CAST(keyval.key AS FLOAT64), keyval.value FROM UNNEST(value.values) keyval) - ) AS value - {% else %} - NULL AS value - {% endif %} -FROM normalized_histograms -UNION ALL -SELECT - client_id, - {% if config.xaxis.value == "submission_date" -%} - submission_date, - {% else %} - build_id, - {% endif %} - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor -%} - branch, - name AS probe, - STRUCT< - bucket_count INT64, - sum INT64, - histogram_type INT64, - `range` ARRAY, - VALUES - ARRAY - >>(1, - COALESCE(SAFE_CAST(SAFE_CAST(FORMAT("%.*f", 2, COALESCE(mozfun.glam.histogram_bucket_from_value(buckets, SAFE_CAST(value AS FLOAT64)), 0) + 0.0001) AS FLOAT64) AS INT64), 0), - 1, - [0, COALESCE(SAFE_CAST(SAFE_CAST(FORMAT("%.*f", 2, COALESCE(mozfun.glam.histogram_bucket_from_value(buckets, SAFE_CAST(value AS FLOAT64)), 0) + 0.0001) AS FLOAT64) AS INT64), 0)], - [STRUCT( - COALESCE(SAFE_CAST(FORMAT("%.*f", 2, COALESCE(mozfun.glam.histogram_bucket_from_value(buckets, SAFE_CAST(value AS FLOAT64)), 0) + 0.0001) AS FLOAT64), 0.0), 1 - )] - ) AS value -FROM - aggregated_scalars - LEFT JOIN buckets_by_metric USING(name) diff --git a/opmon/templates/normalized_sum_udf.sql b/opmon/templates/normalized_sum_udf.sql new file mode 100644 index 0000000..301d6c7 --- /dev/null +++ b/opmon/templates/normalized_sum_udf.sql @@ -0,0 +1,40 @@ +CREATE TEMPORARY FUNCTION histogram_normalized_sum( + arrs ARRAY>, + weight FLOAT64 +) +RETURNS ARRAY> AS ( + -- Input: one histogram for a single client. + -- Returns the normalized sum of the input maps. + -- It returns the total_count[k] / SUM(total_count) + -- for each key k. + ( + WITH total_counts AS ( + SELECT + sum(a.value) AS total_count + FROM + UNNEST(arrs) AS a + ), + summed_counts AS ( + SELECT + a.key AS k, + SUM(a.value) AS v + FROM + UNNEST(arrs) AS a + GROUP BY + a.key + ) + SELECT + ARRAY_AGG( + STRUCT( + k, + COALESCE(SAFE_DIVIDE(1.0 * v, total_count), 0) * weight + ) + ORDER BY + SAFE_CAST(k AS INT64) + ) + FROM + summed_counts + CROSS JOIN + total_counts + ) +); diff --git a/opmon/templates/statistics.sql b/opmon/templates/statistics.sql index 3550a4e..516c873 100644 --- a/opmon/templates/statistics.sql +++ b/opmon/templates/statistics.sql @@ -1,4 +1,39 @@ -WITH merged AS ( +{{ header }} + +{% include 'normalized_sum_udf.sql' %} + +WITH filtered_metrics AS ( + SELECT * + FROM `{{ gcp_project }}.{{ dataset }}_derived.{{ normalized_slug }}` + WHERE {% include 'where_clause.sql' -%} +), + +-- bucket metrics that use percentile +buckets_by_metric AS ( + SELECT + [] AS dummy, + {% set seen_metrics = [] %} + {% for summary in summaries %} + {% if summary.statistic.name == "percentile" %} + {% if summary.metric.type == "scalar" -%} + {% if summary.metric.name not in seen_metrics %} + {% if seen_metrics.append(summary.metric.name) %} {% endif %} + ARRAY(SELECT FORMAT("%.*f", 2, bucket) FROM UNNEST( + mozfun.glam.histogram_generate_scalar_buckets( + LOG(IF(MIN(value) <= 0, 1, MIN({{ summary.metric.name }})), 2), + LOG(IF(MAX(value) <= 0, 1, MAX({{ summary.metric.name }})), 2), + 100 + ) + ) AS bucket ORDER BY bucket) AS {{ summary.metric.name }}_buckets, + {% endif %} + {% endif %} + {% endif %} + {% endfor %} + FROM filtered_metrics +), + + +stats AS ( SELECT {% if config.xaxis.value == "submission_date" -%} submission_date, @@ -9,24 +44,37 @@ WITH merged AS ( {{ dimension.name }}, {% endfor -%} branch, - probe AS metric, - mozfun.hist.merge(ARRAY_AGG(value IGNORE NULLS)).values AS values + ARRAY>[ + {% for summary in summaries %} + STRUCT( + '{{ summary.metric.name }}' AS metric, + '{{ summary.statistic.name }}' AS statistic, + {{ summary.statistic.point }} AS point + {% if summary.statistic.lower -%} + ,{{ summary.statistic.lower }} AS lower + {% endif -%} + {% if summary.statistic.upper -%} + ,{{ summary.statistic.upper }} AS upper + {% endif -%} + {% if summary.statistic.parameter -%} + ,'{{ summary.statistic.parameter }}' AS parameter + {% endif -%} + ) + {{ "," if not loop.last else "" }} + {% endfor %} + ] AS statistics FROM `{{ gcp_project }}.{{ dataset }}.{{ normalized_slug }}` + CROSS JOIN buckets_by_metric WHERE submission_date = DATE("{{ submission_date }}") GROUP BY - {% if config.xaxis.value == "submission_date" -%} - submission_date, - {% else %} - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor -%} - build_id, - {% endif %} - branch, - metric -), stats AS ( - SELECT {% if config.xaxis.value == "submission_date" -%} submission_date, {% else %} @@ -35,43 +83,9 @@ WITH merged AS ( {% for dimension in dimensions -%} {{ dimension.name }}, {% endfor -%} - branch, - metric, - CASE value - {% for probe in probes %} - WHEN probe = "{{ probe.metric.name }}" - THEN ARRAY>[( - {% for stat in probe.statistics %} - {{ stat.name }} AS statistic, - {{ stat.point }} AS point, - {% if stat.lower -%} - stat.lower AS lower, - {% endif -%} - {% if stat.upper -%} - stat.upper AS upper, - {% endif -%} - {% if stat.parameter -%} - stat.parameter AS parameter, - {% endif -%} - {% enfor %} - )] - {% endfor %} - ELSE NULL - END AS values - FROM - merged - GROUP BY - {% if config.xaxis.value == "submission_date" -%} - submission_date, - {% else %} - {% for dimension in dimensions -%} - {{ dimension.name }}, - {% endfor -%} - build_id, - {% endif %} - branch, - metric + branch ) + SELECT {% if config.xaxis.value == "submission_date" -%} submission_date, @@ -82,10 +96,10 @@ SELECT {{ dimension.name }}, {% endfor -%} branch, - metric, + statistic.metric AS metric, statistic.name AS statistic, statistic.point AS point, statistic.lower AS lower, statistic.upper AS upper, statistic.parameter AS parameter -FROM stats, UNNEST(values) as statistic +FROM stats, UNNEST(statistics) as statistic