Bug 1637926 Add an aet_clients_daily table for end-user analysis (#1288)
This commit is contained in:
Родитель
b619159cfc
Коммит
9bcd1d889c
|
@ -21,6 +21,7 @@ import sys
|
|||
SKIP = {
|
||||
# Access Denied
|
||||
"sql/account_ecosystem_derived/ecosystem_client_id_lookup_v1/query.sql",
|
||||
"sql/account_ecosystem_derived/desktop_clients_daily_v1/query.sql",
|
||||
"sql/activity_stream/impression_stats_flat/view.sql",
|
||||
"sql/activity_stream/tile_id_types/view.sql",
|
||||
"sql/monitoring/deletion_request_volume_v1/query.sql",
|
||||
|
|
|
@ -20,6 +20,18 @@ with DAG(
|
|||
"bqetl_account_ecosystem", default_args=default_args, schedule_interval="0 2 * * *"
|
||||
) as dag:
|
||||
|
||||
account_ecosystem_derived__desktop_clients_daily__v1 = bigquery_etl_query(
|
||||
task_id="account_ecosystem_derived__desktop_clients_daily__v1",
|
||||
destination_table="desktop_clients_daily_v1",
|
||||
dataset_id="account_ecosystem_derived",
|
||||
project_id="moz-fx-data-shared-prod",
|
||||
owner="jklukas@mozilla.com",
|
||||
email=["jklukas@mozilla.com"],
|
||||
date_partition_parameter="submission_date",
|
||||
depends_on_past=False,
|
||||
dag=dag,
|
||||
)
|
||||
|
||||
account_ecosystem_derived__ecosystem_client_id_lookup__v1 = bigquery_etl_query(
|
||||
task_id="account_ecosystem_derived__ecosystem_client_id_lookup__v1",
|
||||
destination_table="ecosystem_client_id_lookup_v1",
|
||||
|
@ -46,9 +58,18 @@ with DAG(
|
|||
dag=dag,
|
||||
)
|
||||
|
||||
account_ecosystem_derived__ecosystem_client_id_lookup__v1.set_upstream(
|
||||
account_ecosystem_derived__ecosystem_user_id_lookup__v1
|
||||
account_ecosystem_derived__fxa_logging_users_daily__v1 = bigquery_etl_query(
|
||||
task_id="account_ecosystem_derived__fxa_logging_users_daily__v1",
|
||||
destination_table="fxa_logging_users_daily_v1",
|
||||
dataset_id="account_ecosystem_derived",
|
||||
project_id="moz-fx-data-shared-prod",
|
||||
owner="jklukas@mozilla.com",
|
||||
email=["jklukas@mozilla.com"],
|
||||
date_partition_parameter="submission_date",
|
||||
depends_on_past=False,
|
||||
dag=dag,
|
||||
)
|
||||
|
||||
wait_for_copy_deduplicate_all = ExternalTaskSensor(
|
||||
task_id="wait_for_copy_deduplicate_all",
|
||||
external_dag_id="copy_deduplicate",
|
||||
|
@ -59,6 +80,13 @@ with DAG(
|
|||
pool="DATA_ENG_EXTERNALTASKSENSOR",
|
||||
)
|
||||
|
||||
account_ecosystem_derived__desktop_clients_daily__v1.set_upstream(
|
||||
wait_for_copy_deduplicate_all
|
||||
)
|
||||
|
||||
account_ecosystem_derived__ecosystem_client_id_lookup__v1.set_upstream(
|
||||
account_ecosystem_derived__ecosystem_user_id_lookup__v1
|
||||
)
|
||||
account_ecosystem_derived__ecosystem_client_id_lookup__v1.set_upstream(
|
||||
wait_for_copy_deduplicate_all
|
||||
)
|
||||
|
@ -66,3 +94,10 @@ with DAG(
|
|||
account_ecosystem_derived__ecosystem_user_id_lookup__v1.set_upstream(
|
||||
wait_for_copy_deduplicate_all
|
||||
)
|
||||
|
||||
account_ecosystem_derived__fxa_logging_users_daily__v1.set_upstream(
|
||||
account_ecosystem_derived__ecosystem_user_id_lookup__v1
|
||||
)
|
||||
account_ecosystem_derived__fxa_logging_users_daily__v1.set_upstream(
|
||||
wait_for_copy_deduplicate_all
|
||||
)
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
friendly_name: AET Clients Daily
|
||||
description: >
|
||||
One row per user per service per day, showing metrics across services.
|
||||
|
||||
The `user_id` and `client_id` are directly related to the `ecosystem_user_id`
|
||||
and `ecosystem_client_id` primitive identifiers for Account Ecosystem Telemetry,
|
||||
but are abstracted to prevent fingerprinting and to provide continuity across
|
||||
user password reset events. In this view, we are guaranteed that a logical user
|
||||
is represented by a consistent `user_id` over time.
|
||||
|
||||
For rows representing client telemetry, this view looks up `user_id` at runtime
|
||||
based on `client_id` so that we can have `user_id` values present for any client
|
||||
that has ever logged in to FxA, even for older rows before the first login.
|
|
@ -0,0 +1,57 @@
|
|||
CREATE OR REPLACE VIEW
|
||||
`moz-fx-data-shared-prod.account_ecosystem.aet_clients_daily`
|
||||
AS
|
||||
WITH desktop AS (
|
||||
SELECT
|
||||
submission_date,
|
||||
CAST(NULL AS string) AS canonical_id,
|
||||
ecosystem_client_id_hash,
|
||||
'desktop' AS service,
|
||||
CAST(NULL AS int64) AS event_count,
|
||||
duration_sum,
|
||||
active_hours_sum,
|
||||
scalar_parent_browser_engagement_total_uri_count_sum,
|
||||
normalized_channel AS channel,
|
||||
normalized_os AS os,
|
||||
normalized_country_code AS country,
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.account_ecosystem_derived.desktop_clients_daily_v1`
|
||||
),
|
||||
fxa_logging AS (
|
||||
SELECT
|
||||
submission_date,
|
||||
canonical_id,
|
||||
CAST(NULL AS string) AS ecosystem_client_id_hash,
|
||||
-- We likely want to replace oauth_client_id with a human-readable service name.
|
||||
FORMAT('fxa - %s', oauth_client_id) AS service,
|
||||
event_count,
|
||||
CAST(NULL AS int64) AS duration_sum,
|
||||
CAST(NULL AS int64) AS active_hours_sum,
|
||||
CAST(NULL AS int64) AS scalar_parent_browser_engagement_total_uri_count_sum,
|
||||
CAST(NULL AS string) AS channel,
|
||||
CAST(NULL AS string) AS os,
|
||||
country_code AS country,
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.account_ecosystem_derived.fxa_logging_users_daily_v1`
|
||||
),
|
||||
unioned AS (
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
desktop
|
||||
UNION ALL
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
fxa_logging
|
||||
)
|
||||
SELECT
|
||||
coalesce(unioned.canonical_id, ecil.canonical_id) AS user_id,
|
||||
ecosystem_client_id_hash AS client_id,
|
||||
unioned.* EXCEPT (canonical_id, ecosystem_client_id_hash)
|
||||
FROM
|
||||
unioned
|
||||
LEFT JOIN
|
||||
`moz-fx-data-shared-prod.account_ecosystem_derived.ecosystem_client_id_lookup_v1` AS ecil
|
||||
USING
|
||||
(ecosystem_client_id_hash)
|
|
@ -0,0 +1,11 @@
|
|||
CREATE TABLE IF NOT EXISTS
|
||||
`moz-fx-data-shared-prod.account_ecosystem_derived.desktop_clients_daily_v1`(
|
||||
submission_date DATE,
|
||||
ecosystem_client_id_hash STRING,
|
||||
duration_sum INT64,
|
||||
normalized_channel STRING
|
||||
)
|
||||
PARTITION BY
|
||||
submission_date
|
||||
CLUSTER BY
|
||||
normalized_channel
|
|
@ -0,0 +1,15 @@
|
|||
friendly_name: AET Desktop Clients Daily
|
||||
description: >
|
||||
One row per desktop client per day aggregating all AET pings received for that client.
|
||||
owners:
|
||||
- jklukas@mozilla.com
|
||||
labels:
|
||||
application: aet
|
||||
schedule: daily
|
||||
incremental: true
|
||||
scheduling:
|
||||
dag_name: bqetl_account_ecosystem
|
||||
# We access a restricted table for getting an HMAC key, so cannot dry run
|
||||
# and must explicitly list referenced tables.
|
||||
referenced_tables:
|
||||
- ['telemetry_stable', 'account_ecosystem_v4']
|
|
@ -0,0 +1,94 @@
|
|||
-- Function to coerce a raw value `v` to the nearest quantile to make fingerprinting
|
||||
-- more difficult. The method here attempts to keep aggregates fairly close to aggregates
|
||||
-- over raw values by bucketing based on pairs of quantiles, and using the lower
|
||||
-- quantile as a rough midpoint for each bucket. When adding a new metric, you'll need
|
||||
-- to pass in a quantile array. See the following example query:
|
||||
/*
|
||||
|
||||
-- Sample query for generating a quantile list for a new metric.
|
||||
SELECT
|
||||
FORMAT("%T", APPROX_QUANTILES(payload.duration, 10 ignore nulls)),
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.telemetry_stable.account_ecosystem_v4`
|
||||
WHERE
|
||||
DATE(submission_timestamp) BETWEEN "2020-09-01" AND "2020-10-01"
|
||||
|
||||
*/
|
||||
CREATE TEMP FUNCTION quantilify(v ANY TYPE, quantiles ARRAY<INT64>) AS (
|
||||
(
|
||||
WITH boundaries AS (
|
||||
SELECT
|
||||
ARRAY(
|
||||
SELECT
|
||||
n
|
||||
FROM
|
||||
UNNEST(quantiles) AS n
|
||||
WITH OFFSET AS i
|
||||
WHERE
|
||||
mod(i, 2) = 0
|
||||
AND i
|
||||
BETWEEN 1
|
||||
AND ARRAY_LENGTH(quantiles) - 2
|
||||
) AS uppers,
|
||||
ARRAY(
|
||||
SELECT
|
||||
n
|
||||
FROM
|
||||
UNNEST(quantiles) AS n
|
||||
WITH OFFSET AS i
|
||||
WHERE
|
||||
MOD(i, 2) = 1
|
||||
) AS midpoints,
|
||||
)
|
||||
SELECT
|
||||
midpoints[OFFSET(RANGE_BUCKET(v, uppers))]
|
||||
FROM
|
||||
boundaries
|
||||
)
|
||||
);
|
||||
|
||||
WITH hmac_key AS (
|
||||
SELECT
|
||||
AEAD.DECRYPT_BYTES(
|
||||
(SELECT keyset FROM `moz-fx-dataops-secrets.airflow_query_keys.aet_prod`),
|
||||
ciphertext,
|
||||
CAST(key_id AS BYTES)
|
||||
) AS value
|
||||
FROM
|
||||
`moz-fx-data-shared-prod.account_ecosystem_restricted.encrypted_keys_v1`
|
||||
WHERE
|
||||
key_id = 'aet_hmac_prod'
|
||||
)
|
||||
SELECT
|
||||
DATE(submission_timestamp) AS submission_date,
|
||||
TO_HEX(
|
||||
udf.hmac_sha256((SELECT * FROM hmac_key), CAST(payload.ecosystem_client_id AS BYTES))
|
||||
) AS ecosystem_client_id_hash,
|
||||
SUM(
|
||||
quantilify(
|
||||
payload.duration,
|
||||
[1, 94, 486, 1934, 5316, 12699, 26003, 41084, 80694, 86400, 2717662]
|
||||
)
|
||||
) AS duration_sum,
|
||||
SUM(
|
||||
quantilify(
|
||||
payload.scalars.parent.browser_engagement_active_ticks,
|
||||
[1, 3, 12, 29, 60, 118, 212, 395, 699, 1256, 6649]
|
||||
) / (3600 / 5)
|
||||
) AS active_hours_sum,
|
||||
SUM(
|
||||
quantilify(
|
||||
payload.scalars.parent.browser_engagement_total_uri_count,
|
||||
[1, 3, 7, 13, 22, 39, 71, 126, 227, 432, 37284]
|
||||
)
|
||||
) AS scalar_parent_browser_engagement_total_uri_count_sum,
|
||||
mozfun.stats.mode_last(ARRAY_AGG(normalized_channel)) AS normalized_channel,
|
||||
mozfun.stats.mode_last(ARRAY_AGG(normalized_os)) AS normalized_os,
|
||||
mozfun.stats.mode_last(ARRAY_AGG(normalized_country_code)) AS normalized_country_code,
|
||||
FROM
|
||||
telemetry.account_ecosystem
|
||||
WHERE
|
||||
DATE(submission_timestamp) = @submission_date
|
||||
GROUP BY
|
||||
submission_date,
|
||||
payload.ecosystem_client_id
|
|
@ -18,7 +18,7 @@ WITH unioned AS (
|
|||
aggregated AS (
|
||||
SELECT
|
||||
ecosystem_user_id,
|
||||
array_agg(DISTINCT previous_ecosystem_user_id IGNORE NULLS) AS previous_ecosystem_user_ids
|
||||
ARRAY_AGG(DISTINCT previous_ecosystem_user_id IGNORE NULLS) AS previous_ecosystem_user_ids
|
||||
FROM
|
||||
unioned
|
||||
WHERE
|
||||
|
@ -29,7 +29,7 @@ aggregated AS (
|
|||
SELECT
|
||||
ecosystem_user_id,
|
||||
IF(
|
||||
array_length(previous_ecosystem_user_ids) > 1,
|
||||
ARRAY_LENGTH(previous_ecosystem_user_ids) > 1,
|
||||
ERROR(FORMAT("Found more than 1 previous ID for %s", ecosystem_user_id)),
|
||||
previous_ecosystem_user_ids[SAFE_OFFSET(0)]
|
||||
) AS previous_ecosystem_user_id
|
||||
|
@ -72,7 +72,7 @@ LOOP
|
|||
|
||||
SET checksum_post = (
|
||||
SELECT
|
||||
BIT_XOR(FARM_FINGERPRINT(previous_ecosystem_user_id))
|
||||
BIT_XOR(FARM_FINGERPRINT(TO_JSON_STRING(working_set)))
|
||||
FROM
|
||||
working_set
|
||||
);
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
CREATE TABLE IF NOT EXISTS
|
||||
`moz-fx-data-shared-prod.account_ecosystem_derived.fxa_logging_users_daily_v1`(
|
||||
submission_date DATE,
|
||||
canonical_id STRING,
|
||||
ecosystem_user_id STRING,
|
||||
oauth_client_id STRING,
|
||||
event_count INT64,
|
||||
country_name STRING,
|
||||
country_code STRING
|
||||
)
|
||||
PARTITION BY
|
||||
submission_date
|
||||
CLUSTER BY
|
||||
ecosystem_user_id
|
|
@ -0,0 +1,11 @@
|
|||
friendly_name: AET Desktop Clients Daily
|
||||
description: >
|
||||
One row per canonical_id per oauth service per day aggregating all AET events received for that user.
|
||||
owners:
|
||||
- jklukas@mozilla.com
|
||||
labels:
|
||||
application: aet
|
||||
schedule: daily
|
||||
incremental: true
|
||||
scheduling:
|
||||
dag_name: bqetl_account_ecosystem
|
|
@ -0,0 +1,30 @@
|
|||
WITH daily AS (
|
||||
SELECT
|
||||
DATE(submission_timestamp) AS submission_date,
|
||||
ecosystem_user_id,
|
||||
oauth_client_id,
|
||||
COUNT(*) AS event_count,
|
||||
mozfun.stats.mode_last(ARRAY_AGG(country)) AS country_name,
|
||||
FROM
|
||||
firefox_accounts.account_ecosystem
|
||||
WHERE
|
||||
DATE(submission_timestamp) = @submission_date
|
||||
GROUP BY
|
||||
submission_date,
|
||||
ecosystem_user_id,
|
||||
oauth_client_id
|
||||
)
|
||||
SELECT
|
||||
euil.canonical_id,
|
||||
daily.*,
|
||||
cn.code AS country_code,
|
||||
FROM
|
||||
daily
|
||||
LEFT JOIN
|
||||
ecosystem_user_id_lookup_v1 AS euil
|
||||
USING
|
||||
(ecosystem_user_id)
|
||||
LEFT JOIN
|
||||
static.country_names_v1 AS cn
|
||||
ON
|
||||
daily.country_name = cn.name
|
Загрузка…
Ссылка в новой задаче