diff --git a/mozetl/taar/taar_lite_guidguid.py b/mozetl/taar/taar_lite_guidguid.py index ef6c737..24cb135 100644 --- a/mozetl/taar/taar_lite_guidguid.py +++ b/mozetl/taar/taar_lite_guidguid.py @@ -26,6 +26,72 @@ MAIN_SUMMARY_PATH = "s3://telemetry-parquet/main_summary/v4/" ONE_WEEK_AGO = (dt.datetime.now() - dt.timedelta(days=7)).strftime("%Y%m%d") +def is_valid_addon(broadcast_amo_whitelist, guid, addon): + """ Filter individual addons out to exclude, system addons, + legacy addons, disabled addons, sideloaded addons. + """ + return not ( + addon.is_system + or addon.app_disabled + or addon.type != "extension" + or addon.user_disabled + or addon.foreign_install + or + # make sure the amo_whitelist has been broadcast to worker nodes. + guid not in broadcast_amo_whitelist.value + or + # Make sure that the Pioneer addon is explicitly + # excluded + guid == "pioneer-opt-in@mozilla.org" + ) + + +def get_addons_per_client(broadcast_amo_whitelist, users_df): + """ Extracts a DataFrame that contains one row + for each client along with the list of active add-on GUIDs. + """ + + # Create an add-ons dataset un-nesting the add-on map from each + # user to a list of add-on GUIDs. Also filter undesired add-ons. + return ( + users_df.rdd.map( + lambda p: ( + p["client_id"], + [ + addon_data.addon_id + for addon_data in p["active_addons"] + if is_valid_addon( + broadcast_amo_whitelist, addon_data.addon_id, addon_data + ) + ], + ) + ) + .filter(lambda p: len(p[1]) > 1) + .toDF(["client_id", "addon_ids"]) + ) + + +def get_initial_sample(spark): + """ Takes an initial sample from the longitudinal dataset + (randomly sampled from main summary). Coarse filtering on: + - number of installed addons (greater than 1) + - corrupt and generally wierd telemetry entries + - isolating release channel + - column selection + """ + # Could scale this up to grab more than what is in + # longitudinal and see how long it takes to run. + return ( + spark.table("clients_daily") + .where("active_addons IS NOT null") + .where("size(active_addons) > 1") + .where("channel = 'release'") + .where("normalized_channel = 'release'") + .where("app_name = 'Firefox'") + .selectExpr("client_id", "active_addons") + ) + + def extract_telemetry(spark): """ load some training data from telemetry given a sparkContext """ @@ -33,68 +99,7 @@ def extract_telemetry(spark): # Define the set of feature names to be used in the donor computations. - def get_initial_sample(): - """ Takes an initial sample from the longitudinal dataset - (randomly sampled from main summary). Coarse filtering on: - - number of installed addons (greater than 1) - - corrupt and generally wierd telemetry entries - - isolating release channel - - column selection - """ - # Could scale this up to grab more than what is in - # longitudinal and see how long it takes to run. - return ( - spark.table("clients_daily") - .where("active_addons IS NOT null") - .where("size(active_addons) > 1") - .where("channel = 'release'") - .where("normalized_channel = 'release'") - .where("app_name = 'Firefox'") - .selectExpr("client_id", "active_addons") - ) - - def get_addons_per_client(users_df): - """ Extracts a DataFrame that contains one row - for each client along with the list of active add-on GUIDs. - """ - - def is_valid_addon(guid, addon): - """ Filter individual addons out to exclude, system addons, - legacy addons, disabled addons, sideloaded addons. - """ - return not ( - addon.is_system - or addon.app_disabled - or addon.type != "extension" - or addon.user_disabled - or addon.foreign_install - or - # make sure the amo_whitelist has been broadcast to worker nodes. - guid not in broadcast_amo_whitelist.value - or - # Make sure that the Pioneer addon is explicitly - # excluded - guid == "pioneer-opt-in@mozilla.org" - ) - - # Create an add-ons dataset un-nesting the add-on map from each - # user to a list of add-on GUIDs. Also filter undesired add-ons. - return ( - users_df.rdd.map( - lambda p: ( - p["client_id"], - [ - guid - for guid, data in list(p["active_addons"].items()) - if is_valid_addon(guid, data) - ], - ) - ) - .filter(lambda p: len(p[1]) > 1) - .toDF(["client_id", "addon_ids"]) - ) - - client_features_frame = get_initial_sample() + client_features_frame = get_initial_sample(spark) amo_white_list = taar_utils.load_amo_external_whitelist() logging.info("AMO White list loaded") @@ -102,7 +107,9 @@ def extract_telemetry(spark): broadcast_amo_whitelist = sc.broadcast(amo_white_list) logging.info("Broadcast AMO whitelist success") - addons_info_frame = get_addons_per_client(client_features_frame) + addons_info_frame = get_addons_per_client( + broadcast_amo_whitelist, client_features_frame + ) logging.info("Filtered for valid addons only.") taar_training = (