[DENG-2854] Search glean per-app derived datasets in shredder (#5165)

This commit is contained in:
Ben Wu 2024-04-04 11:35:24 -04:00 коммит произвёл GitHub
Родитель c5d466ff92
Коммит 7644d1e297
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
2 изменённых файлов: 303 добавлений и 57 удалений

Просмотреть файл

@ -7,14 +7,19 @@ import re
from collections import defaultdict
from dataclasses import dataclass
from functools import partial
from itertools import chain
from multiprocessing.pool import ThreadPool
from typing import Dict, List
import requests
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from ..util.bigquery_id import qualified_table_id
SHARED_PROD = "moz-fx-data-shared-prod"
GLEAN_SCHEMA_ID = "glean_ping_1"
GLEAN_APP_LISTINGS_URL = "https://probeinfo.telemetry.mozilla.org/v2/glean/app-listings"
@dataclass(frozen=True)
@ -96,10 +101,6 @@ CONTEXTUAL_SERVICES_SRC = DeleteSource(
table="telemetry_stable.deletion_request_v4",
field="payload.scalars.parent.deletion_request_context_id",
)
FENIX_SRC = DeleteSource(table="fenix.deletion_request", field=GLEAN_CLIENT_ID)
FIREFOX_IOS_SRC = DeleteSource(
table="firefox_ios.deletion_request", field=GLEAN_CLIENT_ID
)
FXA_HMAC_SRC = DeleteSource(
table="firefox_accounts.fxa_delete_events", field="hmac_user_id"
)
@ -156,28 +157,6 @@ user_id_target = partial(DeleteTarget, field=USER_ID)
context_id_target = partial(DeleteTarget, field=CONTEXT_ID)
DELETE_TARGETS: DeleteIndex = {
# Fenix
client_id_target(table="fenix_derived.firefox_android_clients_v1"): FENIX_SRC,
client_id_target(table="fenix_derived.new_profile_activation_v1"): FENIX_SRC,
client_id_target(
table="fenix_derived.funnel_retention_clients_week_2_v1"
): FENIX_SRC,
client_id_target(
table="fenix_derived.funnel_retention_clients_week_4_v1"
): FENIX_SRC,
# Firefox iOS
client_id_target(
table="firefox_ios_derived.firefox_ios_clients_v1"
): FIREFOX_IOS_SRC,
client_id_target(
table="firefox_ios_derived.clients_activation_v1"
): FIREFOX_IOS_SRC,
client_id_target(
table="firefox_ios_derived.funnel_retention_clients_week_2_v1"
): FIREFOX_IOS_SRC,
client_id_target(
table="firefox_ios_derived.funnel_retention_clients_week_4_v1"
): FIREFOX_IOS_SRC,
# Other
client_id_target(table="search_derived.acer_cohort_v1"): DESKTOP_SRC,
client_id_target(
@ -472,6 +451,7 @@ def find_glean_targets(
and other iterable types, e.g. list[DeleteSource] are not allowed or supported.
"""
datasets = {dataset.dataset_id for dataset in client.list_datasets(project)}
glean_stable_tables = [
table
for tables in pool.map(
@ -486,35 +466,65 @@ def find_glean_targets(
for table in tables
if table.labels.get("schema_id") == GLEAN_SCHEMA_ID
]
channel_to_app_name = get_glean_channel_to_app_name_mapping()
# create mapping of dataset -> (tables containing associated deletion requests)
# construct values as tuples because that is what they must be in the return type
sources: dict[str, tuple[DeleteSource, ...]] = defaultdict(tuple)
app_names = set()
source_doctype = "deletion_request"
for table in glean_stable_tables:
if table.table_id.startswith(source_doctype):
source = DeleteSource(qualified_table_id(table), GLEAN_CLIENT_ID, project)
derived_dataset = re.sub("_stable$", "_derived", table.dataset_id)
channel_name = re.sub("_stable$", "", table.dataset_id)
derived_dataset = channel_name + "_derived"
app_name = channel_to_app_name.get(channel_name)
# append to tuple to use every version of deletion request tables
sources[table.dataset_id] += (source,)
sources[derived_dataset] += (source,)
glean_derived_tables = list(
pool.map(
# find the name of all apps that have a dataset of combined channels
if app_name is not None and app_name != channel_name:
app_names.add(app_name)
sources[app_name + "_derived"] += (source,)
# Use deletion request view containing all channels if found, otherwise use per-channel
# tables as delete sources. Some apps don't have views generated by glean_usage
# because they are skipped in bqetl_project.yaml
for app_name in app_names:
try:
source_view = client.get_table(f"{project}.{app_name}.{source_doctype}")
except NotFound:
pass
else:
source = DeleteSource(
qualified_table_id(source_view), GLEAN_CLIENT_ID, project
)
sources[app_name + "_derived"] = (source,)
glean_derived_tables = [
table
for table in pool.map(
client.get_table,
[
table
for tables in pool.map(
client.list_tables,
chain(
*pool.starmap(
_list_tables,
[
bigquery.DatasetReference(project, dataset_id)
(bigquery.DatasetReference(project, dataset_id), client)
for dataset_id in sources
if dataset_id.endswith("_derived")
],
chunksize=1,
)
for table in tables
],
),
chunksize=1,
)
)
if table.table_type == "TABLE"
]
# handle additional source for deletion requests for things like
# https://bugzilla.mozilla.org/show_bug.cgi?id=1810236
# table must contain client_id at the top level and be partitioned on
@ -523,9 +533,15 @@ def find_glean_targets(
for table in glean_derived_tables:
if table.table_id.startswith(derived_source_prefix):
source = DeleteSource(qualified_table_id(table), CLIENT_ID, project)
stable_dataset = re.sub("_derived$", "_stable", table.dataset_id)
sources[stable_dataset] += (source,)
channel_name = re.sub("_derived$", "", table.dataset_id)
app_name = channel_to_app_name.get(channel_name)
sources[channel_name + "_stable"] += (source,)
sources[table.dataset_id] += (source,)
if app_name is not None and app_name != channel_name:
sources[app_name + "_derived"] += (source,)
return {
**{
# glean stable tables that have a source
@ -554,6 +570,43 @@ def find_glean_targets(
}
def get_glean_channel_to_app_name_mapping() -> Dict[str, str]:
"""Return a dict where key is the channel app id and the value is the shared app name.
e.g. {
"org_mozilla_firefox": "fenix",
"org_mozilla_firefox_beta": "fenix",
"org_mozilla_ios_firefox": "firefox_ios",
"org_mozilla_ios_firefoxbeta": "firefox_ios",
}
"""
response = requests.get(GLEAN_APP_LISTINGS_URL)
response.raise_for_status()
app_listings = response.json()
return {
app["bq_dataset_family"]: app["app_name"]
for app in app_listings
if "bq_dataset_family" in app and "app_name" in app
}
def _list_tables(
dataset_ref: bigquery.DatasetReference,
client: bigquery.Client,
) -> List:
"""Wrap bigquery list_tables and return an empty list for non-existent datasets.
Intended to be used with thread pool map function. Some glean apps do not have
derived datasets so this wrapper handles exceptions when listing them.
"""
try:
return list(client.list_tables(dataset_ref))
except NotFound:
return []
EXPERIMENT_ANALYSIS = "moz-fx-data-experiments"

Просмотреть файл

@ -1,15 +1,62 @@
from multiprocessing.pool import ThreadPool
from unittest import mock
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
from google.cloud.bigquery import DatasetReference
from bigquery_etl.shredder.config import DeleteSource, DeleteTarget, find_glean_targets
from bigquery_etl.shredder.config import (
DeleteSource,
DeleteTarget,
_list_tables,
find_glean_targets,
get_glean_channel_to_app_name_mapping,
)
GLEAN_APP_LISTING = [
{
"app_channel": "release",
"app_id": "org.mozilla.firefox",
"app_name": "fenix",
"bq_dataset_family": "org_mozilla_firefox",
},
{
"app_channel": "beta",
"app_id": "org.mozilla.firefox_beta",
"app_name": "fenix",
"bq_dataset_family": "org_mozilla_firefox_beta",
},
{
"app_channel": "nightly",
"app_id": "org.mozilla.fenix.nightly",
"app_name": "fenix",
"bq_dataset_family": "org_mozilla_fenix_nightly",
},
{
"app_channel": "release",
"app_id": "org.mozilla.focus",
"app_name": "focus_android",
"bq_dataset_family": "org_mozilla_focus",
},
{
"app_channel": "beta",
"app_id": "org.mozilla.focus.beta",
"app_name": "focus_android",
"bq_dataset_family": "org_mozilla_focus_beta",
},
]
class FakeClient:
def list_datasets(self, project):
return [
bigquery.DatasetReference(project, f"{app}_{suffix}")
for app in ["fenix", "focus_android"]
for app in [
"org_mozilla_firefox",
"org_mozilla_firefox_beta",
"org_mozilla_focus",
"org_mozilla_focus_beta",
]
for suffix in ["stable", "derived"]
]
@ -18,11 +65,14 @@ class FakeClient:
if dataset_ref.dataset_id.endswith("stable"):
table_ids = ["metrics_v1", "deletion_request_v1", "migration_v1"]
labels["schema_id"] = "glean_ping_1"
elif dataset_ref.dataset_id == "focus_android_derived":
elif dataset_ref.dataset_id in {
"org_mozilla_focus_derived",
"org_mozilla_focus_beta_derived",
}:
table_ids = [
"additional_deletion_requests_v1",
"additional_deletion_requests_v1", # should be ignored
"clients_daily_v1",
"dau_v1",
"dau_v1", # aggregated, no client_id
]
elif dataset_ref.dataset_id.endswith("derived"):
table_ids = ["clients_daily_v1"]
@ -42,7 +92,8 @@ class FakeClient:
def get_table(self, table_ref):
table = bigquery.Table(table_ref)
if table_ref.dataset_id.endswith("stable"):
table._properties[table._PROPERTY_TO_API_FIELD["type"]] = "TABLE"
if table.dataset_id.endswith("stable"):
table.schema = [
bigquery.SchemaField(
"client_info",
@ -51,9 +102,12 @@ class FakeClient:
[bigquery.SchemaField("client_id", "STRING")],
)
]
elif table_ref.table_id in {
elif table.table_id in {
"additional_deletion_requests_v1",
"clients_daily_v1",
} or table.dataset_id in {
"fenix",
"focus_android",
}:
table.schema = [bigquery.SchemaField("client_id", "STRING")]
else:
@ -61,25 +115,76 @@ class FakeClient:
return table
def test_glean_targets():
@mock.patch("bigquery_etl.shredder.config.requests")
def test_glean_targets(mock_requests):
mock_response = mock.Mock()
mock_response.json.return_value = GLEAN_APP_LISTING
mock_requests.get.return_value = mock_response
with ThreadPool(1) as pool:
targets = find_glean_targets(pool, FakeClient())
# convert tuples to sets because additional_deletion_requests are in
# non-deterministic order due to multiprocessing
# order doesn't matter in real execution
for source, target in targets.items():
targets[source] = set(target)
assert targets == {
**{
target: (
target: { # firefox release tables
DeleteSource(
table="fenix_stable.deletion_request_v1",
table="org_mozilla_firefox_stable.deletion_request_v1",
field="client_info.client_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
)
}
for target in [
DeleteTarget(
table="fenix_stable.metrics_v1",
table="org_mozilla_firefox_stable.metrics_v1",
field=("client_info.client_id",),
project="moz-fx-data-shared-prod",
),
DeleteTarget(
table="org_mozilla_firefox_derived.clients_daily_v1",
field=("client_id",),
project="moz-fx-data-shared-prod",
),
]
},
**{
target: { # firefox beta tables
DeleteSource(
table="org_mozilla_firefox_beta_stable.deletion_request_v1",
field="client_info.client_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
}
for target in [
DeleteTarget(
table="org_mozilla_firefox_beta_stable.metrics_v1",
field=("client_info.client_id",),
project="moz-fx-data-shared-prod",
),
DeleteTarget(
table="org_mozilla_firefox_beta_derived.clients_daily_v1",
field=("client_id",),
project="moz-fx-data-shared-prod",
),
]
},
**{
target: { # firefox combined-channel tables
DeleteSource(
table="fenix.deletion_request",
field="client_info.client_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
}
for target in [
DeleteTarget(
table="fenix_derived.clients_daily_v1",
field=("client_id",),
@ -88,31 +193,119 @@ def test_glean_targets():
]
},
**{
target: (
target: { # focus release tables
DeleteSource(
table="focus_android_stable.deletion_request_v1",
table="org_mozilla_focus_stable.deletion_request_v1",
field="client_info.client_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
DeleteSource(
table="focus_android_derived.additional_deletion_requests_v1",
table="org_mozilla_focus_derived.additional_deletion_requests_v1",
field="client_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
)
}
for target in [
DeleteTarget(
table="focus_android_stable.metrics_v1",
table="org_mozilla_focus_stable.metrics_v1",
field=("client_info.client_id",) * 2,
project="moz-fx-data-shared-prod",
),
DeleteTarget(
table="focus_android_derived.clients_daily_v1",
table="org_mozilla_focus_derived.clients_daily_v1",
field=("client_id",) * 2,
project="moz-fx-data-shared-prod",
),
]
},
**{
target: { # focus beta tables
DeleteSource(
table="org_mozilla_focus_beta_stable.deletion_request_v1",
field="client_info.client_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
DeleteSource(
table="org_mozilla_focus_beta_derived.additional_deletion_requests_v1",
field="client_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
}
for target in [
DeleteTarget(
table="org_mozilla_focus_beta_stable.metrics_v1",
field=("client_info.client_id",) * 2,
project="moz-fx-data-shared-prod",
),
DeleteTarget(
table="org_mozilla_focus_beta_derived.clients_daily_v1",
field=("client_id",) * 2,
project="moz-fx-data-shared-prod",
),
]
},
**{
target: { # focus combined-channel tables
DeleteSource(
table="focus_android.deletion_request",
field="client_info.client_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
DeleteSource(
table="org_mozilla_focus_beta_derived.additional_deletion_requests_v1",
field="client_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
DeleteSource(
table="org_mozilla_focus_derived.additional_deletion_requests_v1",
field="client_id",
project="moz-fx-data-shared-prod",
conditions=(),
),
}
for target in [
DeleteTarget(
table="focus_android_derived.clients_daily_v1",
field=("client_id",) * 3,
project="moz-fx-data-shared-prod",
),
]
},
}
@mock.patch("bigquery_etl.shredder.config.requests")
def test_glean_channel_app_mapping(mock_requests):
mock_response = mock.Mock()
mock_response.json.return_value = GLEAN_APP_LISTING
mock_requests.get.return_value = mock_response
actual = get_glean_channel_to_app_name_mapping()
expected = {
"org_mozilla_firefox": "fenix",
"org_mozilla_firefox_beta": "fenix",
"org_mozilla_fenix_nightly": "fenix",
"org_mozilla_focus": "focus_android",
"org_mozilla_focus_beta": "focus_android",
}
assert actual == expected
def test_list_tables_wrapper_empty():
"""List tables wrapper should return an empty list when dataset doesn't exist."""
class EmptyFakeClient(FakeClient):
def list_tables(self, dataset_ref):
raise NotFound("not found")
tables = _list_tables(DatasetReference("project", "dataset"), EmptyFakeClient())
assert tables == []