Replace GLAM temp functions with persistent functions (#1523)

* Replace GLAM temp functions with persistent functions

* Add generated sql

* Fix typo in udf name

* Add missing files and fully qualify udfs

* Add missing namespace

* Namespace even more things

* format sql
This commit is contained in:
Anthony Miyaguchi 2020-11-05 13:42:09 -08:00 коммит произвёл GitHub
Родитель 616574690e
Коммит b77b542743
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 35 добавлений и 369 удалений

Просмотреть файл

@ -1,17 +1,5 @@
{{ header }}
CREATE TEMP FUNCTION udf_js_flatten(histogram ARRAY<STRUCT<key STRING, value FLOAT64>>)
RETURNS STRING
DETERMINISTIC LANGUAGE js
AS
'''
let obj = {};
histogram.map(function(r) {
obj[r.key] = parseFloat(r.value.toFixed(4));
});
return JSON.stringify(obj);
''';
SELECT
channel,
app_version as version,
@ -27,8 +15,8 @@ SELECT
SUBSTR(REPLACE(key, r"\x00", ""), 0, 200) AS metric_key,
client_agg_type,
MAX(total_users) as total_users,
MAX(IF(agg_type = "histogram", udf_js_flatten(aggregates), NULL)) as histogram,
MAX(IF(agg_type = "percentiles", udf_js_flatten(aggregates), NULL)) as percentiles,
MAX(IF(agg_type = "histogram", mozfun.glam.histogram_cast_json(aggregates), NULL)) as histogram,
MAX(IF(agg_type = "percentiles", mozfun.glam.histogram_cast_json(aggregates), NULL)) as percentiles,
FROM
`{{ dataset }}.{{ prefix }}__view_probe_counts_v1`
GROUP BY

Просмотреть файл

@ -1,16 +1,4 @@
-- query for org_mozilla_fenix_glam_nightly__extract_probe_counts_v1;
CREATE TEMP FUNCTION udf_js_flatten(histogram ARRAY<STRUCT<key STRING, value FLOAT64>>)
RETURNS STRING DETERMINISTIC
LANGUAGE js
AS
'''
let obj = {};
histogram.map(function(r) {
obj[r.key] = parseFloat(r.value.toFixed(4));
});
return JSON.stringify(obj);
''';
SELECT
channel,
app_version AS version,
@ -30,8 +18,10 @@ SELECT
SUBSTR(REPLACE(key, r"\x00", ""), 0, 200) AS metric_key,
client_agg_type,
MAX(total_users) AS total_users,
MAX(IF(agg_type = "histogram", udf_js_flatten(aggregates), NULL)) AS histogram,
MAX(IF(agg_type = "percentiles", udf_js_flatten(aggregates), NULL)) AS percentiles,
MAX(IF(agg_type = "histogram", mozfun.glam.histogram_cast_json(aggregates), NULL)) AS histogram,
MAX(
IF(agg_type = "percentiles", mozfun.glam.histogram_cast_json(aggregates), NULL)
) AS percentiles,
FROM
`glam_etl.org_mozilla_fenix_glam_nightly__view_probe_counts_v1`
GROUP BY

Просмотреть файл

@ -31,7 +31,7 @@ CREATE TEMP FUNCTION udf_merged_user_data(old_aggs ANY TYPE, new_aggs ANY TYPE)
key,
process,
agg_type,
udf.map_sum(ARRAY_CONCAT_AGG(aggregates)) AS histogram_aggregates
mozfun.map.sum(ARRAY_CONCAT_AGG(aggregates)) AS histogram_aggregates
FROM unnested
GROUP BY
first_bucket,
@ -59,90 +59,6 @@ CREATE TEMP FUNCTION udf_merged_user_data(old_aggs ANY TYPE, new_aggs ANY TYPE)
)
);
CREATE TEMP FUNCTION udf_normalized_sum (arrs ARRAY<STRUCT<key STRING, value INT64>>)
RETURNS ARRAY<STRUCT<key STRING, value FLOAT64>> AS (
-- Returns the normalized sum of the input maps.
-- It returns the total_count[k] / SUM(total_count)
-- for each key k.
(
WITH total_counts AS (
SELECT
sum(a.value) AS total_count
FROM
UNNEST(arrs) AS a
),
summed_counts AS (
SELECT
a.key AS k,
SUM(a.value) AS v
FROM
UNNEST(arrs) AS a
GROUP BY
a.key
),
final_values AS (
SELECT
STRUCT<key STRING, value FLOAT64>(
k,
COALESCE(SAFE_DIVIDE(1.0 * v, total_count), 0)
) AS record
FROM
summed_counts
CROSS JOIN
total_counts
)
SELECT
ARRAY_AGG(record)
FROM
final_values
)
);
CREATE TEMP FUNCTION udf_normalize_histograms (
arrs ARRAY<STRUCT<
first_bucket INT64,
last_bucket INT64,
num_buckets INT64,
latest_version INT64,
metric STRING,
metric_type STRING,
key STRING,
process STRING,
agg_type STRING,
aggregates ARRAY<STRUCT<key STRING, value INT64>>>>)
RETURNS ARRAY<STRUCT<
first_bucket INT64,
last_bucket INT64,
num_buckets INT64,
latest_version INT64,
metric STRING,
metric_type STRING,
key STRING,
process STRING,
agg_type STRING,
aggregates ARRAY<STRUCT<key STRING, value FLOAT64>>>> AS (
(
WITH normalized AS (
SELECT
first_bucket,
last_bucket,
num_buckets,
latest_version,
metric,
metric_type,
key,
process,
agg_type,
udf_normalized_sum(aggregates) AS aggregates
FROM UNNEST(arrs))
SELECT ARRAY_AGG((first_bucket, last_bucket, num_buckets, latest_version, metric, metric_type, key, process, agg_type, aggregates))
FROM normalized
));
WITH clients_histogram_aggregates_new AS
(SELECT *
FROM clients_histogram_aggregates_new_v1

Просмотреть файл

@ -1,44 +1,3 @@
CREATE TEMP FUNCTION udf_normalized_sum(arrs ARRAY<STRUCT<key STRING, value INT64>>, sampled BOOL)
RETURNS ARRAY<STRUCT<key STRING, value FLOAT64>> AS (
-- Input: one histogram for a single client.
-- Returns the normalized sum of the input maps.
-- It returns the total_count[k] / SUM(total_count)
-- for each key k.
(
WITH total_counts AS (
SELECT
sum(a.value) AS total_count
FROM
UNNEST(arrs) AS a
),
summed_counts AS (
SELECT
a.key AS k,
SUM(a.value) AS v
FROM
UNNEST(arrs) AS a
GROUP BY
a.key
),
final_values AS (
SELECT
STRUCT<key STRING, value FLOAT64>(
k,
-- Weight probes from Windows release clients to account for 10% sampling
COALESCE(SAFE_DIVIDE(1.0 * v, total_count), 0) * IF(sampled, 10, 1)
) AS record
FROM
summed_counts
CROSS JOIN
total_counts
)
SELECT
ARRAY_AGG(record)
FROM
final_values
)
);
WITH filtered_data AS (
SELECT
sample_id,
@ -101,7 +60,10 @@ normalized_histograms AS (
-- This returns true if at least 1 row has sampled=true.
-- ~0.0025% of the population uses more than 1 os for the same set of dimensions
-- and in this case we treat them as Windows+Release users when fudging numbers
udf_normalized_sum(udf.map_sum(ARRAY_CONCAT_AGG(aggregates)), MAX(sampled)) AS aggregates
mozfun.glam.histogram_normalized_sum(
mozfun.map.sum(ARRAY_CONCAT_AGG(aggregates)),
IF(MAX(sampled), 10.0, 1.0)
) AS aggregates
)
FROM
all_combos

Просмотреть файл

@ -1,45 +1,3 @@
CREATE TEMP FUNCTION udf_exponential_buckets(min FLOAT64, max FLOAT64, nBuckets FLOAT64)
RETURNS ARRAY<FLOAT64>
DETERMINISTIC LANGUAGE js AS
'''
let logMax = Math.log(max);
let current = min;
if (current === 0) {
current = 1;
} // If starting from 0, the second bucket should be 1 rather than 0
let retArray = [0, current];
for (let bucketIndex = 2; bucketIndex < Math.min(nBuckets, max, 10000); bucketIndex++) {
let logCurrent = Math.log(current);
let logRatio = (logMax - logCurrent) / (nBuckets - bucketIndex);
let logNext = logCurrent + logRatio;
let nextValue = Math.round(Math.exp(logNext));
current = nextValue > current ? nextValue : current + 1;
retArray[bucketIndex] = current;
}
return retArray
''';
CREATE TEMP FUNCTION udf_linear_buckets(min FLOAT64, max FLOAT64, nBuckets FLOAT64)
RETURNS ARRAY<FLOAT64>
DETERMINISTIC LANGUAGE js AS
'''
let result = [0];
for (let i = 1; i < Math.min(nBuckets, max, 10000); i++) {
let linearRange = (min * (nBuckets - 1 - i) + max * (i - 1)) / (nBuckets - 2);
result.push(Math.round(linearRange));
}
return result;
''';
CREATE TEMP FUNCTION udf_to_string_arr(buckets ARRAY<INT64>)
RETURNS ARRAY<STRING> AS (
(
SELECT ARRAY_AGG(CAST(bucket AS STRING))
FROM UNNEST(buckets) AS bucket
)
);
CREATE TEMP FUNCTION udf_get_buckets(min INT64, max INT64, num INT64, metric_type STRING)
RETURNS ARRAY<INT64> AS (
(
@ -47,8 +5,8 @@ RETURNS ARRAY<INT64> AS (
SELECT
CASE
WHEN metric_type = 'histogram-exponential'
THEN udf_exponential_buckets(min, max, num)
ELSE udf_linear_buckets(min, max, num)
THEN mozfun.glam.histogram_generate_exponential_buckets(min, max, num)
ELSE mozfun.glam.histogram_generate_linear_buckets(min, max, num)
END AS arr
)
@ -58,41 +16,6 @@ RETURNS ARRAY<INT64> AS (
)
);
CREATE TEMP FUNCTION udf_buckets_to_map (buckets ARRAY<STRING>)
RETURNS ARRAY<STRUCT<key STRING, value FLOAT64>> AS (
-- Given an array of values, transform them into a histogram MAP
-- with the number of each key in the `buckets` array
(
SELECT
ARRAY_AGG(STRUCT<key STRING, value FLOAT64>(bucket, 1.0))
FROM
UNNEST(buckets) AS bucket
)
);
CREATE TEMP FUNCTION udf_fill_buckets(input_map ARRAY<STRUCT<key STRING, value FLOAT64>>, buckets ARRAY<STRING>, total_users INT64)
RETURNS ARRAY<STRUCT<key STRING, value FLOAT64>> AS (
-- Given a MAP `input_map`, fill in any missing keys with value `0.0`
(
WITH total_counts AS (
SELECT
key,
-- Dirichlet distribution density for each bucket in a histogram
-- https://docs.google.com/document/d/1ipy1oFIKDvHr3R6Ku0goRjS11R1ZH1z2gygOGkSdqUg
SAFE_DIVIDE(COALESCE(e.value, 0.0) + SAFE_DIVIDE(1, ARRAY_LENGTH(buckets)), total_users + 1) AS value
FROM
UNNEST(buckets) as key
LEFT JOIN
UNNEST(input_map) AS e ON SAFE_CAST(key AS STRING) = e.key
)
SELECT
ARRAY_AGG(STRUCT<key STRING, value FLOAT64>(SAFE_CAST(key AS STRING), value))
FROM
total_counts
)
);
SELECT
IF(os = '*', NULL, os) AS os,
app_version,
@ -105,9 +28,9 @@ SELECT
agg_type AS client_agg_type,
'histogram' AS agg_type,
CAST(ROUND(SUM(record.value)) AS INT64) AS total_users,
udf_fill_buckets(
udf.map_sum(ARRAY_AGG(record)),
udf_to_string_arr(udf_get_buckets(first_bucket, last_bucket, num_buckets, metric_type)),
mozfun.glam.histogram_fill_buckets_dirichlet(
mozfun.map.sum(ARRAY_AGG(record)),
mozfun.glam.histogram_buckets_cast_string_array(udf_get_buckets(first_bucket, last_bucket, num_buckets, metric_type)),
CAST(ROUND(SUM(record.value)) AS INT64)
) AS aggregates
FROM clients_histogram_bucket_counts_v1

Просмотреть файл

@ -1,66 +1,3 @@
CREATE TEMP FUNCTION udf_fill_buckets(input_map ARRAY<STRUCT<key STRING, value FLOAT64>>, buckets ARRAY<STRING>)
RETURNS ARRAY<STRUCT<key STRING, value FLOAT64>> AS (
-- Given a MAP `input_map`, fill in any missing keys with value `0.0`
(
WITH total_counts AS (
SELECT
key,
COALESCE(e.value, 0.0) AS value
FROM
UNNEST(buckets) as key
LEFT JOIN
UNNEST(input_map) AS e ON SAFE_CAST(key AS STRING) = e.key
)
SELECT
ARRAY_AGG(STRUCT<key STRING, value FLOAT64>(SAFE_CAST(key AS STRING), value))
FROM
total_counts
)
);
CREATE TEMPORARY FUNCTION udf_generate_buckets(min_bucket FLOAT64, max_bucket FLOAT64, num_buckets INT64)
RETURNS ARRAY<STRING>
DETERMINISTIC LANGUAGE js
AS
'''
let bucket_size = (max_bucket - min_bucket) / num_buckets;
let buckets = new Set();
for (let bucket = min_bucket; bucket < max_bucket; bucket += bucket_size) {
buckets.add(Math.pow(2, bucket).toFixed(2));
}
return Array.from(buckets);
''';
CREATE TEMP FUNCTION udf_get_values(required ARRAY<FLOAT64>, values ARRAY<FLOAT64>)
RETURNS ARRAY<STRUCT<key STRING, value FLOAT64>> AS (
(
SELECT ARRAY_AGG(record)
FROM (
SELECT
STRUCT<key STRING, value FLOAT64>(
CAST(k AS STRING),
values[OFFSET(CAST(k AS INT64))]
) as record
FROM
UNNEST(required) AS k
)
)
);
CREATE TEMP FUNCTION udf_bucket (
buckets ARRAY<STRING>,
val FLOAT64
)
RETURNS FLOAT64 AS (
-- Bucket `value` into a histogram with min_bucket, max_bucket and num_buckets
(
SELECT MAX(CAST(bucket AS FLOAT64))
FROM UNNEST(buckets) AS bucket
WHERE val >= CAST(bucket AS FLOAT64)
)
);
CREATE TEMP FUNCTION udf_boolean_buckets(
scalar_aggs ARRAY<STRUCT<metric STRING, metric_type STRING, key STRING, process STRING, agg_type STRING, value FLOAT64>>)
RETURNS ARRAY<STRUCT<metric STRING,
@ -139,7 +76,9 @@ log_min_max AS (
GROUP BY 1, 2),
buckets_by_metric AS (
SELECT metric, key, udf_generate_buckets(log_min, log_max, 100) AS buckets
SELECT metric, key, ARRAY(SELECT FORMAT("%.*f", 2, bucket) FROM UNNEST(
mozfun.glam.histogram_generate_scalar_buckets(log_min, log_max, 100)
) AS bucket ORDER BY bucket) AS buckets
FROM log_min_max),
static_combos as (
@ -208,7 +147,7 @@ bucketed_scalars AS (
process,
agg_type,
-- Keep two decimal places before converting bucket to a string
SAFE_CAST(FORMAT("%.*f", 2, udf_bucket(buckets, SAFE_CAST(value AS FLOAT64)) + 0.0001) AS STRING) AS bucket
SAFE_CAST(FORMAT("%.*f", 2, mozfun.glam.histogram_bucket_from_value(buckets, SAFE_CAST(value AS FLOAT64)) + 0.0001) AS STRING) AS bucket
FROM
user_aggregates
CROSS JOIN UNNEST(scalar_aggregates)
@ -268,12 +207,12 @@ SELECT
SUM(user_count) AS total_users,
CASE
WHEN metric_type = 'scalar' OR metric_type = 'keyed-scalar'
THEN udf_fill_buckets(
THEN mozfun.glam.histogram_fill_buckets(
ARRAY_AGG(STRUCT<key STRING, value FLOAT64>(bucket, user_count)),
ANY_VALUE(buckets)
)
WHEN metric_type = 'boolean' OR metric_type = 'keyed-scalar-boolean'
THEN udf_fill_buckets(
THEN mozfun.glam.histogram_fill_buckets(
ARRAY_AGG(STRUCT<key STRING, value FLOAT64>(bucket, user_count)),
['always','never','sometimes'])
END AS aggregates

Просмотреть файл

@ -1,15 +1,3 @@
CREATE TEMP FUNCTION udf_js_flatten(histogram ARRAY<STRUCT<key STRING, value FLOAT64>>)
RETURNS STRING DETERMINISTIC
LANGUAGE js
AS
'''
let obj = {};
histogram.map(function(r) {
obj[r.key] = parseFloat(r.value.toFixed(4));
});
return JSON.stringify(obj);
''';
SELECT
app_version,
COALESCE(os, "*") AS os,
@ -23,8 +11,10 @@ SELECT
metric_type,
total_users,
-- Using MAX instead of COALESCE since this is not in the GROUP BY.
MAX(IF(agg_type = "histogram", udf_js_flatten(aggregates), NULL)) AS histogram,
MAX(IF(agg_type = "percentiles", udf_js_flatten(aggregates), NULL)) AS percentiles
MAX(IF(agg_type = "histogram", mozfun.glam.histogram_cast_json(aggregates), NULL)) AS histogram,
MAX(
IF(agg_type = "percentiles", mozfun.glam.histogram_cast_json(aggregates), NULL)
) AS percentiles
FROM
`moz-fx-data-shared-prod.telemetry.client_probe_counts`
WHERE

Просмотреть файл

@ -1,11 +1,11 @@
SELECT
* EXCEPT (aggregates) REPLACE('percentiles' AS agg_type),
ARRAY<STRUCT<key STRING, value FLOAT64>>[
('5', udf_js.glean_percentile(5, aggregates, metric_type)),
('25', udf_js.glean_percentile(25, aggregates, metric_type)),
('50', udf_js.glean_percentile(50, aggregates, metric_type)),
('75', udf_js.glean_percentile(75, aggregates, metric_type)),
('95', udf_js.glean_percentile(95, aggregates, metric_type))
('5', mozfun.glam.percentile(5, aggregates, metric_type)),
('25', mozfun.glam.percentile(25, aggregates, metric_type)),
('50', mozfun.glam.percentile(50, aggregates, metric_type)),
('75', mozfun.glam.percentile(75, aggregates, metric_type)),
('95', mozfun.glam.percentile(95, aggregates, metric_type))
] AS aggregates
FROM
clients_histogram_probe_counts_v1

Просмотреть файл

@ -1,19 +1,3 @@
CREATE TEMP FUNCTION udf_get_values(required ARRAY<FLOAT64>, values ARRAY<FLOAT64>)
RETURNS ARRAY<STRUCT<key STRING, value FLOAT64>> AS (
(
SELECT ARRAY_AGG(record)
FROM (
SELECT
STRUCT<key STRING, value FLOAT64>(
CAST(k AS STRING),
values[OFFSET(CAST(k AS INT64))]
) as record
FROM
UNNEST(required) AS k
)
)
);
WITH flat_clients_scalar_aggregates AS (
SELECT *,
os = 'Windows' and channel = 'release' AS sampled,
@ -90,7 +74,7 @@ percentiles AS (
client_agg_type)
SELECT *
REPLACE(udf_get_values(
REPLACE(mozfun.glam.map_from_array_offsets(
[5.0, 25.0, 50.0, 75.0, 95.0],
aggregates
) AS aggregates)

Просмотреть файл

@ -3,35 +3,9 @@ CREATE OR REPLACE FUNCTION udf_js.glean_percentile(
histogram ARRAY<STRUCT<key STRING, value FLOAT64>>,
type STRING
)
RETURNS FLOAT64 DETERMINISTIC
LANGUAGE js
AS
'''
if (percentile < 0 || percentile > 100) {
throw "percentile must be a value between 0 and 100";
}
let values = histogram.map(bucket => bucket.value);
let total = values.reduce((a, b) => a + b);
let normalized = values.map(value => value / total);
// Find the index into the cumulative distribution function that corresponds
// to the percentile. This undershoots the true value of the percentile.
let acc = 0;
let index = null;
for (let i = 0; i < normalized.length; i++) {
acc += normalized[i];
index = i;
if (acc >= percentile / 100) {
break;
}
}
// NOTE: we do not perform geometric or linear interpolation, but this would
// be the place to implement it.
return histogram[index].key;
''';
RETURNS FLOAT64 DETERMINISTIC AS (
glam.percentile(percentile, histogram, type)
)
SELECT
assert.equals(
2,