Bug 1632635 - FxA Amplitude export for active events (#941)

* WIP: Initial implementation of FxA Amplitude export

* Use submission_date parameter

* Add hmac-sha256 SQL implementation

* Escape language column name

Co-Authored-By: Jeff Klukas <jeff@klukas.net>

* Use hmac_sha256; update for review feedback

* Reformat sql files

* Add docs for HMAC implementation

* Validate hmac_sha256 against NIST test vectors

* Add filepath as from_text arg

Co-authored-by: Daniel Thorn <dthorn@mozilla.com>

* Explicitly use named argument

Co-authored-by: Daniel Thorn <dthorn@mozilla.com>

* Add docs for hmac validation

* WIP: Derive os_used_week/month as incremental query

* Retrieve hmac_key from encrypted keys table

Co-authored-by: Jeff Klukas <jeff@klukas.net>

* Remove fxa_hmac param

* Reformat SQL files

* Use bytes_seen for os_used

* Rename udfs

* Format UDF sql

* Don't include NULL os values

* Don't include NULL user properties

* Update comment for UDF

* Use fully-named datasets, not fxa*

* Cast key_id to bytes

* Fix failing tests

* Fix test failures

* Use new dataset for view query

* Add access denied exception for secret access

* Remove flake8 changes

* Update description of fxa_amplitude_export

Co-authored-by: Jeff Klukas <jeff@klukas.net>

* Remove version suffix from view

Co-authored-by: Jeff Klukas <jeff@klukas.net>
Co-authored-by: Daniel Thorn <dthorn@mozilla.com>
This commit is contained in:
Frank Bertsch 2020-05-05 22:37:26 -04:00 коммит произвёл GitHub
Родитель 069c24ac60
Коммит bd7be1606c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 2122 добавлений и 11 удалений

Просмотреть файл

@ -41,12 +41,28 @@ class RawUdf:
with open(filepath) as f: with open(filepath) as f:
text = f.read() text = f.read()
name = basename.replace(".sql", "")
dataset = os.path.basename(dirpath)
try:
return RawUdf.from_text(text, dataset, name, filepath)
except ValueError as e:
raise ValueError(str(e) + f" in {filepath}")
@staticmethod
def from_text(text, dataset, name, filepath=None, is_defined=True):
"""Create a RawUdf instance from text.
If is_defined is False, then the UDF does not
need to be defined in the text; it could be
just tests.
"""
sql = sqlparse.format(text, strip_comments=True) sql = sqlparse.format(text, strip_comments=True)
statements = [s for s in sqlparse.split(sql) if s.strip()] statements = [s for s in sqlparse.split(sql) if s.strip()]
prod_name = basename.replace(".sql", "") prod_name = name
persistent_name = os.path.basename(dirpath) + "." + prod_name persistent_name = f"{dataset}.{name}"
temp_name = os.path.basename(dirpath) + "_" + prod_name temp_name = f"{dataset}_{name}"
internal_name = None internal_name = None
definitions = [] definitions = []
@ -68,10 +84,10 @@ class RawUdf:
tests.append(s) tests.append(s)
for name in (prod_name, internal_name): for name in (prod_name, internal_name):
if not UDF_NAME_RE.match(name): if is_defined and not UDF_NAME_RE.match(name):
raise ValueError( raise ValueError(
f"Invalid UDF name {name}: Must start with alpha char, " f"Invalid UDF name {name}: Must start with alpha char, "
f"limited to chars {UDF_CHAR}, be at most 256 chars long." f"limited to chars {UDF_CHAR}, be at most 256 chars long"
) )
# find usages of both persistent and temporary UDFs # find usages of both persistent and temporary UDFs
@ -79,12 +95,13 @@ class RawUdf:
dependencies = [".".join(t) for t in dependencies] dependencies = [".".join(t) for t in dependencies]
dependencies.extend(re.findall(TEMP_UDF_RE, "\n".join(definitions))) dependencies.extend(re.findall(TEMP_UDF_RE, "\n".join(definitions)))
if internal_name is None: if is_defined:
raise ValueError( if internal_name is None:
f"Expected a UDF named {persistent_name} or {temp_name} " raise ValueError(
f"to be defined in {filepath}" f"Expected a UDF named {persistent_name} or {temp_name} "
) f"to be defined"
dependencies.remove(internal_name) )
dependencies.remove(internal_name)
return RawUdf( return RawUdf(
internal_name, internal_name,

Просмотреть файл

@ -46,6 +46,7 @@ SKIP = {
"sql/search/search_clients_last_seen_v1/view.sql", "sql/search/search_clients_last_seen_v1/view.sql",
"sql/search/search_clients_last_seen/view.sql", "sql/search/search_clients_last_seen/view.sql",
"sql/telemetry_derived/deviations_v1/query.sql", "sql/telemetry_derived/deviations_v1/query.sql",
"sql/firefox_accounts_derived/fxa_amplitude_export_v1/query.sql",
# Already exists (and lacks an "OR REPLACE" clause) # Already exists (and lacks an "OR REPLACE" clause)
"sql/org_mozilla_firefox_derived/clients_first_seen_v1/init.sql", "sql/org_mozilla_firefox_derived/clients_first_seen_v1/init.sql",
"sql/org_mozilla_firefox_derived/clients_last_seen_v1/init.sql", "sql/org_mozilla_firefox_derived/clients_last_seen_v1/init.sql",

Просмотреть файл

@ -0,0 +1,125 @@
CREATE OR REPLACE VIEW
`moz-fx-data-shared-prod.firefox_accounts.fxa_amplitude_export`
AS
WITH active_users AS (
SELECT
`moz-fx-data-shared-prod`.udf.active_values_from_days_seen_map(os_used_month, 0, 1) AS os_used_day,
`moz-fx-data-shared-prod`.udf.active_values_from_days_seen_map(os_used_month, -6, 7) AS os_used_week,
`moz-fx-data-shared-prod`.udf.active_values_from_days_seen_map(os_used_month, -27, 28) AS os_used_month,
* EXCEPT (days_seen_bits, os_used_month)
FROM
`moz-fx-data-shared-prod`.firefox_accounts_derived.fxa_amplitude_export_v1
WHERE
`moz-fx-data-shared-prod`.udf.pos_of_trailing_set_bit(days_seen_bits) = 0
),
active_events AS (
SELECT
submission_timestamp,
user_id,
insert_id,
'fxa_activity - active' AS event_type,
timestamp,
TO_JSON_STRING(STRUCT(services, oauth_client_ids)) AS event_properties,
'' AS user_events
FROM
active_users
),
user_properties AS (
SELECT
submission_timestamp,
user_id,
'' AS insert_id,
'$identify' AS event_type,
timestamp,
'' AS event_properties,
-- We don't want to include user_properties if they are null, so we need
-- to list them out explicitly and filter with WHERE
CONCAT(
"{",
ARRAY_TO_STRING(
ARRAY(
SELECT
CONCAT(TO_JSON_STRING(key), ":", value)
FROM
(
SELECT AS STRUCT
"region" AS key,
TO_JSON_STRING(region) AS value,
UNION ALL
SELECT AS STRUCT
"country" AS key,
TO_JSON_STRING(country) AS value,
UNION ALL
SELECT AS STRUCT
"LANGUAGE" AS key,
TO_JSON_STRING(LANGUAGE) AS value,
UNION ALL
SELECT AS STRUCT
"os_used_day" AS key,
TO_JSON_STRING(os_used_day) AS value,
UNION ALL
SELECT AS STRUCT
"os_used_week" AS key,
TO_JSON_STRING(os_used_week) AS value,
UNION ALL
SELECT AS STRUCT
"os_used_month" AS key,
TO_JSON_STRING(os_used_month) AS value,
UNION ALL
SELECT AS STRUCT
"sync_device_count" AS key,
TO_JSON_STRING(sync_device_count) AS value,
UNION ALL
SELECT AS STRUCT
"sync_active_devices_day" AS key,
TO_JSON_STRING(sync_active_devices_day) AS value,
UNION ALL
SELECT AS STRUCT
"sync_active_devices_week" AS key,
TO_JSON_STRING(sync_active_devices_week) AS value,
UNION ALL
SELECT AS STRUCT
"sync_active_devices_month" AS key,
TO_JSON_STRING(sync_active_devices_month) AS value,
UNION ALL
SELECT AS STRUCT
"ua_version" AS key,
TO_JSON_STRING(ua_version) AS value,
UNION ALL
SELECT AS STRUCT
"ua_browser" AS key,
TO_JSON_STRING(ua_browser) AS value,
UNION ALL
SELECT AS STRUCT
"app_version" AS key,
TO_JSON_STRING(app_version) AS value,
UNION ALL
SELECT AS STRUCT
"$postInsert",
TO_JSON_STRING(STRUCT(fxa_services_used)) AS value
)
WHERE
value != "null"
),
","
),
"}"
) AS used_properties
FROM
active_users
),
all_events AS (
SELECT
*
FROM
active_events
UNION ALL
SELECT
*
FROM
user_properties
)
SELECT
*
FROM
all_events

Просмотреть файл

@ -0,0 +1,35 @@
CREATE OR REPLACE TABLE
`moz-fx-data-shared-prod`.firefox_accounts_derived.fxa_amplitude_export_v1
PARTITION BY
(DATE(submission_timestamp))
CLUSTER BY
(user_id)
AS
WITH columns AS (
SELECT
CAST(NULL AS TIMESTAMP) AS submission_timestamp,
CAST(NULL AS STRING) AS user_id,
CAST(NULL AS STRING) AS insert_id,
CAST(NULL AS DATETIME) AS timestamp,
CAST(NULL AS STRING) AS region,
CAST(NULL AS STRING) AS country,
CAST(NULL AS STRING) AS `language`,
CAST(NULL AS ARRAY<STRING>) AS services,
CAST(NULL AS ARRAY<STRING>) AS oauth_client_ids,
CAST(NULL AS ARRAY<STRING>) AS fxa_services_used,
CAST(NULL AS ARRAY<STRUCT<key STRING, value INT64>>) AS os_used_month,
CAST(NULL AS INT64) AS sync_device_count,
CAST(NULL AS INT64) AS sync_active_devices_day,
CAST(NULL AS INT64) AS sync_active_devices_week,
CAST(NULL AS INT64) AS sync_active_devices_month,
CAST(NULL AS STRING) AS ua_version,
CAST(NULL AS STRING) AS ua_browser,
CAST(NULL AS FLOAT64) AS app_version,
CAST(NULL AS INT64) AS days_seen_bits,
)
SELECT
*
FROM
columns
WHERE
FALSE

Просмотреть файл

@ -0,0 +1,10 @@
friendly_name: FxA Amplitude Export
description: >
Derived from FxA logs, this table contains active events and user property
updates for FxA users. A view of this table is exported to Amplitude.
owners:
- frank@mozilla.com
labels:
application: FxA
incremental: true
schedule: daily

Просмотреть файл

@ -0,0 +1,130 @@
WITH hmac_key AS (
SELECT
AEAD.DECRYPT_BYTES(
(SELECT keyset FROM `moz-fx-dataops-secrets.airflow_query_keys.fxa_prod`),
ciphertext,
CAST(key_id AS BYTES)
) AS value
FROM
`moz-fx-data-shared-prod.firefox_accounts_derived.encrypted_keys_v1`
WHERE
key_id = 'fxa_hmac_prod'
),
base_events AS (
SELECT
*
FROM
`moz-fx-fxa-prod-0712.fxa_prod_logs.docker_fxa_auth_20*`
WHERE
_TABLE_SUFFIX = FORMAT_DATE('%g%m%d', @submission_date)
AND jsonPayload.fields.event_type IN (
'fxa_activity - cert_signed',
'fxa_activity - access_token_checked',
'fxa_activity - access_token_created'
)
AND jsonPayload.fields.user_id IS NOT NULL
),
grouped_by_user AS (
SELECT
-- to prevent weirdness from timestamp field, use provided
-- submission date parameter as timestamp
TO_HEX(
udf.hmac_sha256((SELECT * FROM hmac_key), CAST(jsonPayload.fields.user_id AS BYTES))
) AS user_id,
MIN(CONCAT(insertId, '-user')) AS insert_id,
CAST(@submission_date AS DATETIME) AS timestamp,
-- Amplitude properties, scalars
`moz-fx-data-shared-prod`.udf.mode_last(ARRAY_AGG(jsonPayload.fields.region)) AS region,
`moz-fx-data-shared-prod`.udf.mode_last(ARRAY_AGG(jsonPayload.fields.country)) AS country,
`moz-fx-data-shared-prod`.udf.mode_last(ARRAY_AGG(jsonPayload.fields.`language`)) AS `language`,
-- Event properties, arrays
ARRAY_AGG(
DISTINCT JSON_EXTRACT_SCALAR(jsonPayload.fields.event_properties, "$.service") IGNORE NULLS
) AS services,
ARRAY_AGG(
DISTINCT JSON_EXTRACT_SCALAR(
jsonPayload.fields.event_properties,
"$.oauth_client_id"
) IGNORE NULLS
) AS oauth_client_ids,
-- User properties, arrays
ARRAY_AGG(
DISTINCT JSON_EXTRACT_SCALAR(
jsonPayload.fields.user_properties,
"$['$append'].fxa_services_used"
) IGNORE NULLS
) AS fxa_services_used,
ARRAY_AGG(DISTINCT jsonPayload.fields.os_name IGNORE NULLS) AS os_used_month,
-- User properties, scalars
MAX(
CAST(JSON_EXTRACT_SCALAR(jsonPayload.fields.user_properties, "$.sync_device_count") AS INT64)
) AS sync_device_count,
MAX(
CAST(
JSON_EXTRACT_SCALAR(
jsonPayload.fields.user_properties,
"$.sync_active_devices_day"
) AS INT64
)
) AS sync_active_devices_day,
MAX(
CAST(
JSON_EXTRACT_SCALAR(
jsonPayload.fields.user_properties,
"$.sync_active_devices_week"
) AS INT64
)
) AS sync_active_devices_week,
MAX(
CAST(
JSON_EXTRACT_SCALAR(
jsonPayload.fields.user_properties,
"$.sync_active_devices_month"
) AS INT64
)
) AS sync_active_devices_month,
`moz-fx-data-shared-prod`.udf.mode_last(
ARRAY_AGG(
JSON_EXTRACT_SCALAR(jsonPayload.fields.user_properties, "$.ua_version") IGNORE NULLS
)
) AS ua_version,
`moz-fx-data-shared-prod`.udf.mode_last(
ARRAY_AGG(
JSON_EXTRACT_SCALAR(jsonPayload.fields.user_properties, "$.ua_version") IGNORE NULLS
)
) AS ua_browser,
MAX(CAST(jsonPayload.fields.app_version AS FLOAT64)) AS app_version,
CAST(TRUE AS INT64) AS days_seen_bits,
FROM
base_events
GROUP BY
user_id
),
_previous AS (
SELECT
* EXCEPT (submission_timestamp)
FROM
firefox_accounts_derived.fxa_amplitude_export_v1
WHERE
DATE(submission_timestamp) = DATE_SUB(@submission_date, INTERVAL 1 DAY)
AND udf.shift_28_bits_one_day(days_seen_bits) > 0
)
SELECT
CAST(@submission_date AS TIMESTAMP) AS submission_timestamp,
_current.* REPLACE (
COALESCE(_current.user_id, _previous.user_id) AS user_id,
udf.combine_adjacent_days_28_bits(
_previous.days_seen_bits,
_current.days_seen_bits
) AS days_seen_bits,
udf.combine_days_seen_maps(
_previous.os_used_month,
ARRAY(SELECT STRUCT(key, CAST(TRUE AS INT64) AS value) FROM _current.os_used_month AS key)
) AS os_used_month
)
FROM
grouped_by_user _current
FULL OUTER JOIN
_previous
USING
(user_id)

Просмотреть файл

@ -0,0 +1,4 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
"""Validation Tests."""

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,55 @@
"""
Validate HMAC-SHA256 implementation against the NIST test vectors.
The vectors are located in tests/validation/data/hmac_sha256_validation.json.
"""
import json
import pytest
from bigquery_etl.parse_udf import read_udf_dirs, RawUdf, udf_tests_sql
from google.cloud import bigquery
validation_data_file = "tests/validation/data/hmac_sha256_validation.json"
def udfs():
"""Get all udfs and assertions."""
return read_udf_dirs("tests/assert", "udf", "udf_js")
def load_data():
"""Load test data."""
with open(validation_data_file, "r") as f:
return json.load(f)["data"]
def generate_raw_udf(test_cases):
"""Generate a SQL test for each instance in hmac_sha256_validation.json."""
test_sql_fixture = (
"SELECT assert_equals("
"'{Mac}',"
"TO_HEX(SUBSTR("
"udf.hmac_sha256("
"FROM_HEX('{Key}'),"
"FROM_HEX('{Msg}')),"
"1,"
"{Tlen})));"
)
test_sql_stmnts = [test_sql_fixture.format(**test_case) for test_case in test_cases]
return RawUdf.from_text(
"\n".join(test_sql_stmnts), "udf", "hmac_sha256", is_defined=False
)
def generate_sql():
"""Generate SQL statements to test."""
return udf_tests_sql(generate_raw_udf(load_data()), udfs())
@pytest.mark.parametrize("sql", generate_sql())
def test_validate_hmac_sha256(sql):
"""Validate hmac_sha256."""
job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
job = bigquery.Client().query(sql, job_config=job_config)
job.result()

Просмотреть файл

@ -0,0 +1,58 @@
/*
Given a map of representing activity for STRING `key`s, this
function returns an array of which `key`s were active for the
time period in question.
start_offset should be at most 0.
n_bits should be at most the remaining bits.
*/
CREATE OR REPLACE FUNCTION udf.active_values_from_days_seen_map(
days_seen_bits_map ARRAY<STRUCT<key STRING, value INT64>>,
start_offset INT64,
n_bits INT64
) AS (
ARRAY(
SELECT
DISTINCT key
FROM
UNNEST(days_seen_bits_map)
WHERE
-- TODO: Use udf.bits28_active_in_range when it's available
BIT_COUNT(value << (64 + start_offset - 1) >> (64 - n_bits)) > 0
)
);
-- Tests
SELECT
assert_array_equals(
['a', 'b'],
udf.active_values_from_days_seen_map(
[STRUCT('a' AS key, 1 AS value), STRUCT('b' AS key, 3 AS value)],
0,
1
)
),
assert_array_equals(
['a'],
udf.active_values_from_days_seen_map(
[STRUCT('a' AS key, 2048 AS value), STRUCT('b' AS key, 3 AS value)],
-14,
7
)
),
assert_array_equals(
['b'],
udf.active_values_from_days_seen_map(
[STRUCT('a' AS key, 2048 AS value), STRUCT('b' AS key, 3 AS value)],
-6,
7
)
),
assert_array_equals(
['a', 'b'],
udf.active_values_from_days_seen_map(
[STRUCT('a' AS key, 1 AS value), STRUCT('b' AS key, 3 AS value)],
-27,
28
)
);

Просмотреть файл

@ -0,0 +1,69 @@
/*
The "clients_last_seen" class of tables represent various types of client
activity within a 28-day window as bit patterns.
This function takes in two arrays of structs (aka maps) where each entry gives the
bit pattern for days in which we saw a ping for a given user in a given
key. We combine the bit patterns for the previous day and the
current day, returning a single map.
See `udf.combine_experiment_days` for a more specific example of this approach.
*/
CREATE OR REPLACE FUNCTION udf.combine_days_seen_maps(
--
prev ARRAY<STRUCT<key STRING, value INT64>>,
--
curr ARRAY<STRUCT<key STRING, value INT64>>
) AS (
-- The below is logically a FULL JOIN, but BigQuery returns error
-- "Array scan is not allowed with FULL JOIN" so we have to do two
-- separate scans.
ARRAY_CONCAT(
-- Keys present in prev (and potentially in curr too)
ARRAY(
SELECT AS STRUCT
key,
udf.combine_adjacent_days_28_bits(prev.value, curr.value) AS value
FROM
UNNEST(prev) AS prev
LEFT JOIN
UNNEST(curr) AS curr
USING
(key)
WHERE
udf.combine_adjacent_days_28_bits(prev.value, curr.value) > 0
),
-- Keys present in curr only
ARRAY(
SELECT AS STRUCT
key,
curr.value
FROM
UNNEST(curr) AS curr
LEFT JOIN
UNNEST(prev) AS prev
USING
(key)
WHERE
prev IS NULL
)
)
);
-- Tests
SELECT
assert_array_equals(
[
STRUCT("key1" AS key, 3 AS value),
STRUCT("key2" AS key, 6 AS value),
STRUCT("key3" AS key, 1 AS value)
],
udf.combine_days_seen_maps(
[
STRUCT("key1" AS key, 1 AS value),
STRUCT("key2" AS key, 3 AS value),
STRUCT("key3" AS key, 1 << 27 AS value)
],
[STRUCT("key1" AS key, 1 AS value), STRUCT("key3" AS key, 1 AS value)]
)
);

22
udf/hmac_sha256.sql Normal file
Просмотреть файл

@ -0,0 +1,22 @@
/*
Given a key and message, return the HMAC-SHA256 hash.
This algorithm can be found in Wikipedia:
https://en.wikipedia.org/wiki/HMAC#Implementation
This implentation is validated against the NIST test vectors.
See test/validation/hmac_sha256.py for more information.
*/
CREATE OR REPLACE FUNCTION udf.hmac_sha256(key BYTES, message BYTES) AS (
SHA256(
CONCAT(
RPAD(IF(BYTE_LENGTH(key) > 64, SHA256(key), key), 64, b'\x00') ^ REPEAT(b'\x5c', 64),
SHA256(
CONCAT(
RPAD(IF(BYTE_LENGTH(key) > 64, SHA256(key), key), 64, b'\x00') ^ REPEAT(b'\x36', 64),
message
)
)
)
)
);