473 строки
18 KiB
Python
473 строки
18 KiB
Python
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
"""
|
|
Bug 1396549 - TAAR Top addons per locale dictionary
|
|
This notebook is adapted from a gist that computes the top N addons per
|
|
locale after filtering for good candidates (e.g. no unsigned, no disabled,
|
|
...) [1].
|
|
|
|
[1] https://gist.github.com/mlopatka/46dddac9d063589275f06b0443fcc69d
|
|
|
|
|
|
|
|
Original source and test code can be found in the python_mozetl
|
|
repository at:
|
|
|
|
* https://github.com/mozilla/python_mozetl/blob/13266cf/mozetl/taar/taar_locale.py
|
|
* https://github.com/mozilla/python_mozetl/blob/13266cf/tests/test_taar_locale.py
|
|
"""
|
|
|
|
import click
|
|
import json
|
|
import logging
|
|
import os
|
|
from google.cloud import storage
|
|
import io
|
|
import bz2
|
|
|
|
from datetime import date, timedelta
|
|
from pyspark.sql import SparkSession
|
|
from pyspark.sql.functions import col
|
|
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
|
|
from pandas import DataFrame, IndexSlice
|
|
from numpy.random import laplace as rlaplace
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
LOCALE_FILE_NAME = "top10_dict"
|
|
# Set the max multiplicative DP difference in probabilities to
|
|
# exp(espilon) ~= 1.5.
|
|
EPSILON = 0.4
|
|
|
|
|
|
def get_client_addons(spark, start_date):
|
|
"""Returns a Spark DF listing add-ons by client_id and locale.
|
|
|
|
Only Firefox release clients are considered. The query finds each client's
|
|
most recent record in the `clients_daily` dataset over the given time period
|
|
and returns its installed add-ons. System add-ons, disabled add-ons, and
|
|
unsigned add-ons are filtered out.
|
|
|
|
:param start_date: the earliest submission date to include (yyyymmdd)
|
|
:return: a DF with columns `locale`, `client_id`, `addon`
|
|
"""
|
|
|
|
addons_query_template = """
|
|
WITH sample AS (
|
|
SELECT
|
|
client_id,
|
|
'{start_date}' as submission_date_s3,
|
|
locale,
|
|
active_addons
|
|
FROM clients_daily
|
|
WHERE
|
|
app_name='Firefox'
|
|
AND channel='release'
|
|
AND client_id IS NOT NULL
|
|
),
|
|
sample_dedup AS (
|
|
SELECT
|
|
client_id,
|
|
locale,
|
|
explode(active_addons) AS addon_info
|
|
FROM (
|
|
SELECT
|
|
*,
|
|
-- retain the most recent active day for each client
|
|
row_number() OVER (
|
|
PARTITION BY client_id
|
|
ORDER BY submission_date_s3 DESC
|
|
) AS idx
|
|
FROM sample
|
|
)
|
|
WHERE idx = 1
|
|
)
|
|
SELECT
|
|
locale,
|
|
client_id,
|
|
addon_info.addon_id as addon
|
|
FROM sample_dedup
|
|
WHERE
|
|
addon_info.blocklisted = FALSE -- not blocklisted
|
|
AND addon_info.type = 'extension' -- nice webextensions only
|
|
AND addon_info.signed_state = 2 -- fully reviewed addons only
|
|
AND addon_info.user_disabled = FALSE -- active addons only get counted
|
|
AND addon_info.app_disabled = FALSE -- exclude compatibility disabled addons
|
|
AND addon_info.is_system = FALSE -- exclude system addons
|
|
AND locale <> 'null'
|
|
AND addon_info.addon_id IS NOT NULL
|
|
"""
|
|
|
|
addons_query = addons_query_template.format(start_date=start_date)
|
|
|
|
gs_url = "gs://airflow-dataproc-bq-parquet-exports/clients_daily/v6/submission_date_s3={}".format(
|
|
start_date
|
|
)
|
|
parquetFile = spark.read.parquet(gs_url)
|
|
|
|
# Use the parquet files to create a temporary view and then used in SQL statements.
|
|
parquetFile.createOrReplaceTempView("clients_daily")
|
|
return spark.sql(addons_query)
|
|
|
|
|
|
def limit_client_addons(spark, client_addons_df, addon_limits, whitelist):
|
|
"""Limit the number of add-ons associated with a single client ID.
|
|
|
|
This is a part of the privacy protection mechanism applied to the raw data.
|
|
For each client in the dataset, we retain a randomly selected subset of
|
|
their add-ons which belong to the whitelist. The max number of add-ons may
|
|
differ by locale.
|
|
|
|
:param client_addons_df: a DF listing add-on IDs by client ID and locale, as
|
|
generated by `get_client_addons()`
|
|
:param addon_limits: a dict mapping locale strings to ints representing the
|
|
max number of add-ons retained per client in that locale.
|
|
Any locale not present in the dict is excluded from the
|
|
final dataset.
|
|
:param whitelist: a list of add-on IDs belonging to the AMO whitelist
|
|
:return: a DF containing a subset of the rows of `client_addons_df`
|
|
"""
|
|
|
|
# Convert the dict of limits to a Spark DF that can be joined in.
|
|
addon_limits_schema = StructType(
|
|
[
|
|
StructField("locale", StringType(), False),
|
|
StructField("client_max_addons", IntegerType(), False),
|
|
]
|
|
)
|
|
addon_limits_df = spark.createDataFrame(
|
|
addon_limits.items(), schema=addon_limits_schema
|
|
)
|
|
# Inner join means that data for any locale not listed in the dict gets dropped.
|
|
client_df = client_addons_df.join(addon_limits_df, on="locale", how="inner")
|
|
# client_df now has columns (locale, client_id, addon, client_max_addons).
|
|
# Restrict to whitelist add-ons.
|
|
client_df = client_df.where(col("addon").isin(whitelist))
|
|
client_df.createOrReplaceTempView("client_addons")
|
|
|
|
# Limit each client's add-ons to a random subset of max allowed size.
|
|
# Add-ons are shuffled by generating an auxiliary column of independent Uniform(0,1)
|
|
# random variables and sorting along this column within clients.
|
|
# Shuffling only needs to be done if the user has more than the max allowed
|
|
# add-ons (otherwise they will all be retained).
|
|
return spark.sql(
|
|
"""
|
|
WITH addons AS (
|
|
-- add a convenience column listing the number of add-ons each client has
|
|
SELECT
|
|
*,
|
|
COUNT(client_id) OVER (PARTITION BY client_id) AS num_client_addons
|
|
FROM client_addons
|
|
),
|
|
shuffle_ord AS (
|
|
-- add the auxiliary sorting column
|
|
SELECT
|
|
*,
|
|
-- only need to shuffle if the client has too many add-ons
|
|
CASE WHEN num_client_addons > client_max_addons THEN RAND() ELSE NULL
|
|
END AS ord
|
|
FROM addons
|
|
)
|
|
SELECT
|
|
client_id,
|
|
locale,
|
|
addon
|
|
FROM (
|
|
SELECT
|
|
*,
|
|
row_number() OVER (PARTITION BY client_id ORDER BY ord) AS idx
|
|
FROM shuffle_ord
|
|
)
|
|
WHERE idx <= client_max_addons
|
|
"""
|
|
)
|
|
|
|
|
|
def compute_noisy_counts(
|
|
locale_addon_counts, addon_limits, whitelist, eps=EPSILON
|
|
):
|
|
"""Apply DP protections to the raw per-locale add-on frequency counts.
|
|
|
|
Laplace noise is added to each of the counts. Additionally, each per-locale
|
|
set of frequency counts is expanded to include every add-on in the
|
|
whitelist, even if some were not observed in the raw data.
|
|
|
|
This computation is done in local memory, rather than in Spark, to simplify
|
|
working with random number generation. This relies on the assumption that
|
|
the number of unique locales and whitelist add-ons each remain small (on the
|
|
order of 100-1000).
|
|
|
|
:param locale_addon_counts: a Pandas DF of per-locale add-on frequency
|
|
counts, with columns `locale`, `addon`, `count`
|
|
:param addon_limits: a dict mapping locale strings to ints representing the
|
|
max number of add-ons retained per client in that locale.
|
|
Any locale not present in the dict is excluded from the
|
|
final dataset.
|
|
:param whitelist: a list of add-on IDs belonging to the AMO whitelist
|
|
:param eps: the DP epsilon parameter, representing the privacy budget
|
|
:return: a DF with the same structure as `locale_addon_counts`. Counts may
|
|
now be non-integer and negative.
|
|
"""
|
|
|
|
# First expand the frequency count table to include all whitelisted add-ons
|
|
# in each locale.
|
|
locale_wl_addons = DataFrame.from_records(
|
|
[(loc, a) for loc in addon_limits.keys() for a in whitelist],
|
|
columns=["locale", "addon"],
|
|
)
|
|
raw_counts = locale_addon_counts.set_index(["locale", "addon"])
|
|
locale_wl = locale_wl_addons.set_index(["locale", "addon"])
|
|
# Left join means the result will have every add-on in the whitelist and
|
|
# only the locales in the limits dict.
|
|
expanded_counts = locale_wl.join(raw_counts, how="left").fillna(0)
|
|
|
|
# Add the Laplace noise.
|
|
#
|
|
# For each add-on in the whitelist, in each locale, we take the observed
|
|
# installation frequency count and add independent random noise.
|
|
# Observed frequencies may be 0 if no profile had those add-ons installed.
|
|
#
|
|
# The random noise is Laplace-distributed with scale parameter $m/\epsilon$,
|
|
# where epsilon is the DP privacy budget, and m is the max number of add-ons
|
|
# reported per client in the current locale.
|
|
#
|
|
# Since the Laplace parametrization depends only on locale, we iterate over
|
|
# locales and add a numpy array of independent simulated Laplace random
|
|
# values to the series of add-on frequency counts.
|
|
#
|
|
# Since the Laplace noise is continuous and real-valued, counts will no
|
|
# longer be integer, and may become negative.
|
|
for locale in expanded_counts.index.unique("locale"):
|
|
# The scale parameter depends on the max number of add-ons per client,
|
|
# which varies by locale.
|
|
locale_laplace_param = float(addon_limits[locale]) / eps
|
|
# Select counts for all add-ons in the current locale.
|
|
locale_idx = IndexSlice[locale, :]
|
|
locale_counts = expanded_counts.loc[locale_idx, "count"]
|
|
locale_counts += rlaplace(
|
|
scale=locale_laplace_param, size=len(locale_counts)
|
|
)
|
|
expanded_counts.loc[locale_idx, "count"] = locale_counts
|
|
|
|
return expanded_counts.reset_index()
|
|
|
|
|
|
def get_addon_limits_by_locale(client_addons_df):
|
|
"""Determine the max number of add-ons per user in each locale.
|
|
|
|
We allow for the possibility of basing this on a summary statistic computed
|
|
from the original data, in which case the limits will remain private.
|
|
|
|
:param client_addons_df: a DF listing add-on IDs by client ID and locale, as
|
|
generated by `get_client_addons()`
|
|
:return: a dict mapping locale strings to their add-on limits
|
|
"""
|
|
|
|
# For now, set add-on limits to 1 per client for each locale observed in
|
|
# the dataset.
|
|
all_locales = client_addons_df.select("locale").distinct().collect()
|
|
return {r["locale"]: 1 for r in all_locales}
|
|
|
|
|
|
def get_protected_locale_addon_counts(spark, client_addons_df, bucket):
|
|
"""Compute DP-protected per-locale add-on frequency counts.
|
|
|
|
Privacy-preserving counts are generated using the Laplace mechanism,
|
|
restricting to add-ons in the AMO whitelist.
|
|
|
|
:param client_addons_df: a DF listing add-on IDs by client ID and locale, as
|
|
generated by `get_client_addons()`
|
|
:return: a Pandas DF with columns `locale`, `addon`, `count` containing
|
|
DP-protected counts for each whitelist add-on within each locale.
|
|
Unlike the true counts, weights may be non-integer or negative.
|
|
"""
|
|
|
|
# Load external whitelist based on AMO data.
|
|
amo_whitelist = load_amo_curated_whitelist(bucket)
|
|
|
|
# Determine the max number of add-ons per user in each locale.
|
|
locale_addon_limits = get_addon_limits_by_locale(client_addons_df)
|
|
|
|
# Limit the number of add-ons per client.
|
|
limited_addons_df = limit_client_addons(
|
|
spark, client_addons_df, locale_addon_limits, amo_whitelist
|
|
)
|
|
|
|
# Aggregate to a Pandas DF of locale/add-on frequency counts.
|
|
locale_addon_counts = (
|
|
limited_addons_df.groupBy("locale", "addon").count().toPandas()
|
|
)
|
|
|
|
# Add noise to the frequency counts.
|
|
noisy_addon_counts = compute_noisy_counts(
|
|
locale_addon_counts, locale_addon_limits, amo_whitelist, EPSILON
|
|
)
|
|
|
|
return noisy_addon_counts
|
|
|
|
|
|
def get_top_addons_by_locale(addon_counts, num_addons):
|
|
"""Generate a dictionary of top-weighted add-ons by locale.
|
|
|
|
Raw counts are normalized by converting to relative proportions.
|
|
|
|
:param addon_counts: a Pandas DF of per-locale add-on counts, with columns
|
|
`locale`, `addon`, `count`.
|
|
:param num_addons: requested number of recommendations.
|
|
:return: a dictionary `{<locale>: [('GUID1', 0.4), ('GUID2', 0.25), ...]}`
|
|
"""
|
|
|
|
top_addons = {}
|
|
addons_by_locale = addon_counts.set_index(["locale", "addon"])
|
|
|
|
for locale in addons_by_locale.index.unique("locale"):
|
|
# For each locale, work with a Series of counts indexed by add-on.
|
|
counts = addons_by_locale.loc[locale, "count"]
|
|
# Shift counts so as to align the smallest count with 0 in each locale.
|
|
# Since DP-protected counts can be negative, this is done to avoid
|
|
# problems computing the sum.
|
|
shifted_counts = counts - counts.min()
|
|
rel_counts = shifted_counts / shifted_counts.sum()
|
|
top_rel_counts = rel_counts.sort_values(ascending=False).head(
|
|
num_addons
|
|
)
|
|
|
|
top_addons[locale] = list(top_rel_counts.iteritems())
|
|
|
|
return top_addons
|
|
|
|
|
|
def generate_dictionary(spark, num_addons, dataset_num_days, bucket):
|
|
"""Compile lists of top add-ons by locale from per-client add-on data.
|
|
|
|
Runs a fresh data pull against `clients_daily`, computes DP-protected
|
|
frequency counts, and generates a weighted list of top add-ons by locale.
|
|
|
|
:param num_addons: number of add-on recommendations to report for each locale
|
|
:param dataset_num_days: number of days the raw data should cover
|
|
:return: a dictionary `{<locale>: [('GUID1', 0.4), ('GUID2', 0.25), ...]}`
|
|
as returned by `get_top_addons_by_locale()`
|
|
"""
|
|
|
|
# Execute spark.SQL query to get fresh addons from clients_daily.
|
|
# Add an extra day to the date range since the latest date in the dataset
|
|
# tends to lag 1 day behind.
|
|
earliest_date = date.today() - timedelta(days=dataset_num_days + 1)
|
|
earliest_date_fmt = earliest_date.strftime("%Y%m%d")
|
|
addon_df = get_client_addons(spark, earliest_date_fmt)
|
|
|
|
# Compute DP-protected frequency-based counts for each add-on by locale.
|
|
addon_df_protected = get_protected_locale_addon_counts(
|
|
spark, addon_df, bucket
|
|
)
|
|
|
|
# Find the top add-ons in each locale based on these counts.
|
|
top_addons = get_top_addons_by_locale(addon_df_protected, num_addons)
|
|
|
|
logger.info(f"Top addons: {top_addons}")
|
|
|
|
return top_addons
|
|
|
|
|
|
def read_from_gcs(fname, prefix, bucket):
|
|
simple_fname = f"{prefix}/{fname}.bz2"
|
|
try:
|
|
with io.BytesIO() as tmpfile:
|
|
client = storage.Client()
|
|
bucket = client.get_bucket(bucket)
|
|
blob = bucket.blob(simple_fname)
|
|
blob.download_to_file(tmpfile)
|
|
tmpfile.seek(0)
|
|
payload = tmpfile.read()
|
|
payload = bz2.decompress(payload)
|
|
return json.loads(payload.decode("utf8"))
|
|
except Exception:
|
|
logger.exception(f"Error reading from GCS gs://{bucket}/{simple_fname}")
|
|
|
|
|
|
|
|
def load_amo_curated_whitelist(bucket):
|
|
"""
|
|
Return the curated whitelist of addon GUIDs
|
|
"""
|
|
|
|
whitelist = read_from_gcs(
|
|
"only_guids_top_200.json", "addon_recommender", bucket,
|
|
)
|
|
return list(whitelist)
|
|
|
|
|
|
def store_json_to_gcs(
|
|
bucket, prefix, filename, json_obj, iso_date_str,
|
|
):
|
|
"""Saves the JSON data to a local file and then uploads it to GCS.
|
|
Two copies of the file will get uploaded: one with as "<base_filename>.json"
|
|
and the other as "<base_filename><YYYYMMDD>.json" for backup purposes.
|
|
|
|
:param bucket: The GCS bucket name.
|
|
:param prefix: The GCS prefix.
|
|
:param filename: A string with the base name of the file to use for saving
|
|
locally and uploading to GCS
|
|
:param json_data: A string with the JSON content to write.
|
|
:param date: A date string in the "YYYYMMDD" format.
|
|
"""
|
|
|
|
try:
|
|
byte_data = json.dumps(json_obj).encode("utf8")
|
|
|
|
byte_data = bz2.compress(byte_data)
|
|
logger.info(f"Compressed data is {len(byte_data)} bytes")
|
|
|
|
client = storage.Client()
|
|
bucket = client.get_bucket(bucket)
|
|
simple_fname = f"{prefix}/{filename}.bz2"
|
|
blob = bucket.blob(simple_fname)
|
|
blob.chunk_size = 5 * 1024 * 1024 # Set 5 MB blob size
|
|
print(f"Wrote out {simple_fname}")
|
|
blob.upload_from_string(byte_data)
|
|
long_fname = f"{prefix}/{filename}.{iso_date_str}.bz2"
|
|
blob = bucket.blob(long_fname)
|
|
blob.chunk_size = 5 * 1024 * 1024 # Set 5 MB blob size
|
|
print(f"Wrote out {long_fname}")
|
|
blob.upload_from_string(byte_data)
|
|
except Exception:
|
|
logger.exception(f"Error saving to GCS, Bucket: {bucket}, base object name: {prefix}/{filename}")
|
|
|
|
|
|
|
|
@click.command()
|
|
@click.option("--date", required=True)
|
|
@click.option("--bucket", default="taar_models")
|
|
@click.option("--prefix", default="taar/locale")
|
|
@click.option("--num_addons", default=10)
|
|
@click.option("--client_data_num_days", default=28)
|
|
def main(
|
|
date, bucket, prefix, num_addons, client_data_num_days,
|
|
):
|
|
logging.info("Starting taar-locale")
|
|
logging.info("Acquiring spark session")
|
|
spark = SparkSession.builder.appName("taar_locale").getOrCreate()
|
|
|
|
logger.info("Processing top N addons per locale")
|
|
locale_dict = generate_dictionary(
|
|
spark, num_addons, client_data_num_days, bucket
|
|
)
|
|
|
|
current_filename = "{}.json".format(LOCALE_FILE_NAME)
|
|
store_json_to_gcs(
|
|
bucket, prefix,
|
|
current_filename,
|
|
locale_dict,
|
|
date,
|
|
)
|
|
|
|
spark.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|