The `get_addons_per_client` call had a bug where `active_addons` in the longitudinal table was structured as a dictionary of addon_guid -> addon_metadata, but the clients_daily table models the active_addons as a list of just addon_metadata.

This patch also unrolls the `extract_telemetry` function from using any nested functions to aid clarity.
This commit is contained in:
Victor Ng 2019-09-24 13:10:10 -07:00 коммит произвёл Anthony Miyaguchi
Родитель 9efff104db
Коммит 86eb9a9a44
1 изменённых файлов: 70 добавлений и 63 удалений

Просмотреть файл

@ -26,6 +26,72 @@ MAIN_SUMMARY_PATH = "s3://telemetry-parquet/main_summary/v4/"
ONE_WEEK_AGO = (dt.datetime.now() - dt.timedelta(days=7)).strftime("%Y%m%d")
def is_valid_addon(broadcast_amo_whitelist, guid, addon):
""" Filter individual addons out to exclude, system addons,
legacy addons, disabled addons, sideloaded addons.
"""
return not (
addon.is_system
or addon.app_disabled
or addon.type != "extension"
or addon.user_disabled
or addon.foreign_install
or
# make sure the amo_whitelist has been broadcast to worker nodes.
guid not in broadcast_amo_whitelist.value
or
# Make sure that the Pioneer addon is explicitly
# excluded
guid == "pioneer-opt-in@mozilla.org"
)
def get_addons_per_client(broadcast_amo_whitelist, users_df):
""" Extracts a DataFrame that contains one row
for each client along with the list of active add-on GUIDs.
"""
# Create an add-ons dataset un-nesting the add-on map from each
# user to a list of add-on GUIDs. Also filter undesired add-ons.
return (
users_df.rdd.map(
lambda p: (
p["client_id"],
[
addon_data.addon_id
for addon_data in p["active_addons"]
if is_valid_addon(
broadcast_amo_whitelist, addon_data.addon_id, addon_data
)
],
)
)
.filter(lambda p: len(p[1]) > 1)
.toDF(["client_id", "addon_ids"])
)
def get_initial_sample(spark):
""" Takes an initial sample from the longitudinal dataset
(randomly sampled from main summary). Coarse filtering on:
- number of installed addons (greater than 1)
- corrupt and generally wierd telemetry entries
- isolating release channel
- column selection
"""
# Could scale this up to grab more than what is in
# longitudinal and see how long it takes to run.
return (
spark.table("clients_daily")
.where("active_addons IS NOT null")
.where("size(active_addons) > 1")
.where("channel = 'release'")
.where("normalized_channel = 'release'")
.where("app_name = 'Firefox'")
.selectExpr("client_id", "active_addons")
)
def extract_telemetry(spark):
""" load some training data from telemetry given a sparkContext
"""
@ -33,68 +99,7 @@ def extract_telemetry(spark):
# Define the set of feature names to be used in the donor computations.
def get_initial_sample():
""" Takes an initial sample from the longitudinal dataset
(randomly sampled from main summary). Coarse filtering on:
- number of installed addons (greater than 1)
- corrupt and generally wierd telemetry entries
- isolating release channel
- column selection
"""
# Could scale this up to grab more than what is in
# longitudinal and see how long it takes to run.
return (
spark.table("clients_daily")
.where("active_addons IS NOT null")
.where("size(active_addons) > 1")
.where("channel = 'release'")
.where("normalized_channel = 'release'")
.where("app_name = 'Firefox'")
.selectExpr("client_id", "active_addons")
)
def get_addons_per_client(users_df):
""" Extracts a DataFrame that contains one row
for each client along with the list of active add-on GUIDs.
"""
def is_valid_addon(guid, addon):
""" Filter individual addons out to exclude, system addons,
legacy addons, disabled addons, sideloaded addons.
"""
return not (
addon.is_system
or addon.app_disabled
or addon.type != "extension"
or addon.user_disabled
or addon.foreign_install
or
# make sure the amo_whitelist has been broadcast to worker nodes.
guid not in broadcast_amo_whitelist.value
or
# Make sure that the Pioneer addon is explicitly
# excluded
guid == "pioneer-opt-in@mozilla.org"
)
# Create an add-ons dataset un-nesting the add-on map from each
# user to a list of add-on GUIDs. Also filter undesired add-ons.
return (
users_df.rdd.map(
lambda p: (
p["client_id"],
[
guid
for guid, data in list(p["active_addons"].items())
if is_valid_addon(guid, data)
],
)
)
.filter(lambda p: len(p[1]) > 1)
.toDF(["client_id", "addon_ids"])
)
client_features_frame = get_initial_sample()
client_features_frame = get_initial_sample(spark)
amo_white_list = taar_utils.load_amo_external_whitelist()
logging.info("AMO White list loaded")
@ -102,7 +107,9 @@ def extract_telemetry(spark):
broadcast_amo_whitelist = sc.broadcast(amo_white_list)
logging.info("Broadcast AMO whitelist success")
addons_info_frame = get_addons_per_client(client_features_frame)
addons_info_frame = get_addons_per_client(
broadcast_amo_whitelist, client_features_frame
)
logging.info("Filtered for valid addons only.")
taar_training = (