telemetry-airflow/jobs/taar_locale.py

473 строки
18 KiB
Python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Bug 1396549 - TAAR Top addons per locale dictionary
This notebook is adapted from a gist that computes the top N addons per
locale after filtering for good candidates (e.g. no unsigned, no disabled,
...) [1].
[1] https://gist.github.com/mlopatka/46dddac9d063589275f06b0443fcc69d
Original source and test code can be found in the python_mozetl
repository at:
* https://github.com/mozilla/python_mozetl/blob/13266cf/mozetl/taar/taar_locale.py
* https://github.com/mozilla/python_mozetl/blob/13266cf/tests/test_taar_locale.py
"""
import click
import json
import logging
import os
from google.cloud import storage
import io
import bz2
from datetime import date, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pandas import DataFrame, IndexSlice
from numpy.random import laplace as rlaplace
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
LOCALE_FILE_NAME = "top10_dict"
# Set the max multiplicative DP difference in probabilities to
# exp(espilon) ~= 1.5.
EPSILON = 0.4
def get_client_addons(spark, start_date):
"""Returns a Spark DF listing add-ons by client_id and locale.
Only Firefox release clients are considered. The query finds each client's
most recent record in the `clients_daily` dataset over the given time period
and returns its installed add-ons. System add-ons, disabled add-ons, and
unsigned add-ons are filtered out.
:param start_date: the earliest submission date to include (yyyymmdd)
:return: a DF with columns `locale`, `client_id`, `addon`
"""
addons_query_template = """
WITH sample AS (
SELECT
client_id,
'{start_date}' as submission_date_s3,
locale,
active_addons
FROM clients_daily
WHERE
app_name='Firefox'
AND channel='release'
AND client_id IS NOT NULL
),
sample_dedup AS (
SELECT
client_id,
locale,
explode(active_addons) AS addon_info
FROM (
SELECT
*,
-- retain the most recent active day for each client
row_number() OVER (
PARTITION BY client_id
ORDER BY submission_date_s3 DESC
) AS idx
FROM sample
)
WHERE idx = 1
)
SELECT
locale,
client_id,
addon_info.addon_id as addon
FROM sample_dedup
WHERE
addon_info.blocklisted = FALSE -- not blocklisted
AND addon_info.type = 'extension' -- nice webextensions only
AND addon_info.signed_state = 2 -- fully reviewed addons only
AND addon_info.user_disabled = FALSE -- active addons only get counted
AND addon_info.app_disabled = FALSE -- exclude compatibility disabled addons
AND addon_info.is_system = FALSE -- exclude system addons
AND locale <> 'null'
AND addon_info.addon_id IS NOT NULL
"""
addons_query = addons_query_template.format(start_date=start_date)
gs_url = "gs://airflow-dataproc-bq-parquet-exports/clients_daily/v6/submission_date_s3={}".format(
start_date
)
parquetFile = spark.read.parquet(gs_url)
# Use the parquet files to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("clients_daily")
return spark.sql(addons_query)
def limit_client_addons(spark, client_addons_df, addon_limits, whitelist):
"""Limit the number of add-ons associated with a single client ID.
This is a part of the privacy protection mechanism applied to the raw data.
For each client in the dataset, we retain a randomly selected subset of
their add-ons which belong to the whitelist. The max number of add-ons may
differ by locale.
:param client_addons_df: a DF listing add-on IDs by client ID and locale, as
generated by `get_client_addons()`
:param addon_limits: a dict mapping locale strings to ints representing the
max number of add-ons retained per client in that locale.
Any locale not present in the dict is excluded from the
final dataset.
:param whitelist: a list of add-on IDs belonging to the AMO whitelist
:return: a DF containing a subset of the rows of `client_addons_df`
"""
# Convert the dict of limits to a Spark DF that can be joined in.
addon_limits_schema = StructType(
[
StructField("locale", StringType(), False),
StructField("client_max_addons", IntegerType(), False),
]
)
addon_limits_df = spark.createDataFrame(
addon_limits.items(), schema=addon_limits_schema
)
# Inner join means that data for any locale not listed in the dict gets dropped.
client_df = client_addons_df.join(addon_limits_df, on="locale", how="inner")
# client_df now has columns (locale, client_id, addon, client_max_addons).
# Restrict to whitelist add-ons.
client_df = client_df.where(col("addon").isin(whitelist))
client_df.createOrReplaceTempView("client_addons")
# Limit each client's add-ons to a random subset of max allowed size.
# Add-ons are shuffled by generating an auxiliary column of independent Uniform(0,1)
# random variables and sorting along this column within clients.
# Shuffling only needs to be done if the user has more than the max allowed
# add-ons (otherwise they will all be retained).
return spark.sql(
"""
WITH addons AS (
-- add a convenience column listing the number of add-ons each client has
SELECT
*,
COUNT(client_id) OVER (PARTITION BY client_id) AS num_client_addons
FROM client_addons
),
shuffle_ord AS (
-- add the auxiliary sorting column
SELECT
*,
-- only need to shuffle if the client has too many add-ons
CASE WHEN num_client_addons > client_max_addons THEN RAND() ELSE NULL
END AS ord
FROM addons
)
SELECT
client_id,
locale,
addon
FROM (
SELECT
*,
row_number() OVER (PARTITION BY client_id ORDER BY ord) AS idx
FROM shuffle_ord
)
WHERE idx <= client_max_addons
"""
)
def compute_noisy_counts(
locale_addon_counts, addon_limits, whitelist, eps=EPSILON
):
"""Apply DP protections to the raw per-locale add-on frequency counts.
Laplace noise is added to each of the counts. Additionally, each per-locale
set of frequency counts is expanded to include every add-on in the
whitelist, even if some were not observed in the raw data.
This computation is done in local memory, rather than in Spark, to simplify
working with random number generation. This relies on the assumption that
the number of unique locales and whitelist add-ons each remain small (on the
order of 100-1000).
:param locale_addon_counts: a Pandas DF of per-locale add-on frequency
counts, with columns `locale`, `addon`, `count`
:param addon_limits: a dict mapping locale strings to ints representing the
max number of add-ons retained per client in that locale.
Any locale not present in the dict is excluded from the
final dataset.
:param whitelist: a list of add-on IDs belonging to the AMO whitelist
:param eps: the DP epsilon parameter, representing the privacy budget
:return: a DF with the same structure as `locale_addon_counts`. Counts may
now be non-integer and negative.
"""
# First expand the frequency count table to include all whitelisted add-ons
# in each locale.
locale_wl_addons = DataFrame.from_records(
[(loc, a) for loc in addon_limits.keys() for a in whitelist],
columns=["locale", "addon"],
)
raw_counts = locale_addon_counts.set_index(["locale", "addon"])
locale_wl = locale_wl_addons.set_index(["locale", "addon"])
# Left join means the result will have every add-on in the whitelist and
# only the locales in the limits dict.
expanded_counts = locale_wl.join(raw_counts, how="left").fillna(0)
# Add the Laplace noise.
#
# For each add-on in the whitelist, in each locale, we take the observed
# installation frequency count and add independent random noise.
# Observed frequencies may be 0 if no profile had those add-ons installed.
#
# The random noise is Laplace-distributed with scale parameter $m/\epsilon$,
# where epsilon is the DP privacy budget, and m is the max number of add-ons
# reported per client in the current locale.
#
# Since the Laplace parametrization depends only on locale, we iterate over
# locales and add a numpy array of independent simulated Laplace random
# values to the series of add-on frequency counts.
#
# Since the Laplace noise is continuous and real-valued, counts will no
# longer be integer, and may become negative.
for locale in expanded_counts.index.unique("locale"):
# The scale parameter depends on the max number of add-ons per client,
# which varies by locale.
locale_laplace_param = float(addon_limits[locale]) / eps
# Select counts for all add-ons in the current locale.
locale_idx = IndexSlice[locale, :]
locale_counts = expanded_counts.loc[locale_idx, "count"]
locale_counts += rlaplace(
scale=locale_laplace_param, size=len(locale_counts)
)
expanded_counts.loc[locale_idx, "count"] = locale_counts
return expanded_counts.reset_index()
def get_addon_limits_by_locale(client_addons_df):
"""Determine the max number of add-ons per user in each locale.
We allow for the possibility of basing this on a summary statistic computed
from the original data, in which case the limits will remain private.
:param client_addons_df: a DF listing add-on IDs by client ID and locale, as
generated by `get_client_addons()`
:return: a dict mapping locale strings to their add-on limits
"""
# For now, set add-on limits to 1 per client for each locale observed in
# the dataset.
all_locales = client_addons_df.select("locale").distinct().collect()
return {r["locale"]: 1 for r in all_locales}
def get_protected_locale_addon_counts(spark, client_addons_df, bucket):
"""Compute DP-protected per-locale add-on frequency counts.
Privacy-preserving counts are generated using the Laplace mechanism,
restricting to add-ons in the AMO whitelist.
:param client_addons_df: a DF listing add-on IDs by client ID and locale, as
generated by `get_client_addons()`
:return: a Pandas DF with columns `locale`, `addon`, `count` containing
DP-protected counts for each whitelist add-on within each locale.
Unlike the true counts, weights may be non-integer or negative.
"""
# Load external whitelist based on AMO data.
amo_whitelist = load_amo_curated_whitelist(bucket)
# Determine the max number of add-ons per user in each locale.
locale_addon_limits = get_addon_limits_by_locale(client_addons_df)
# Limit the number of add-ons per client.
limited_addons_df = limit_client_addons(
spark, client_addons_df, locale_addon_limits, amo_whitelist
)
# Aggregate to a Pandas DF of locale/add-on frequency counts.
locale_addon_counts = (
limited_addons_df.groupBy("locale", "addon").count().toPandas()
)
# Add noise to the frequency counts.
noisy_addon_counts = compute_noisy_counts(
locale_addon_counts, locale_addon_limits, amo_whitelist, EPSILON
)
return noisy_addon_counts
def get_top_addons_by_locale(addon_counts, num_addons):
"""Generate a dictionary of top-weighted add-ons by locale.
Raw counts are normalized by converting to relative proportions.
:param addon_counts: a Pandas DF of per-locale add-on counts, with columns
`locale`, `addon`, `count`.
:param num_addons: requested number of recommendations.
:return: a dictionary `{<locale>: [('GUID1', 0.4), ('GUID2', 0.25), ...]}`
"""
top_addons = {}
addons_by_locale = addon_counts.set_index(["locale", "addon"])
for locale in addons_by_locale.index.unique("locale"):
# For each locale, work with a Series of counts indexed by add-on.
counts = addons_by_locale.loc[locale, "count"]
# Shift counts so as to align the smallest count with 0 in each locale.
# Since DP-protected counts can be negative, this is done to avoid
# problems computing the sum.
shifted_counts = counts - counts.min()
rel_counts = shifted_counts / shifted_counts.sum()
top_rel_counts = rel_counts.sort_values(ascending=False).head(
num_addons
)
top_addons[locale] = list(top_rel_counts.iteritems())
return top_addons
def generate_dictionary(spark, num_addons, dataset_num_days, bucket):
"""Compile lists of top add-ons by locale from per-client add-on data.
Runs a fresh data pull against `clients_daily`, computes DP-protected
frequency counts, and generates a weighted list of top add-ons by locale.
:param num_addons: number of add-on recommendations to report for each locale
:param dataset_num_days: number of days the raw data should cover
:return: a dictionary `{<locale>: [('GUID1', 0.4), ('GUID2', 0.25), ...]}`
as returned by `get_top_addons_by_locale()`
"""
# Execute spark.SQL query to get fresh addons from clients_daily.
# Add an extra day to the date range since the latest date in the dataset
# tends to lag 1 day behind.
earliest_date = date.today() - timedelta(days=dataset_num_days + 1)
earliest_date_fmt = earliest_date.strftime("%Y%m%d")
addon_df = get_client_addons(spark, earliest_date_fmt)
# Compute DP-protected frequency-based counts for each add-on by locale.
addon_df_protected = get_protected_locale_addon_counts(
spark, addon_df, bucket
)
# Find the top add-ons in each locale based on these counts.
top_addons = get_top_addons_by_locale(addon_df_protected, num_addons)
logger.info(f"Top addons: {top_addons}")
return top_addons
def read_from_gcs(fname, prefix, bucket):
simple_fname = f"{prefix}/{fname}.bz2"
try:
with io.BytesIO() as tmpfile:
client = storage.Client()
bucket = client.get_bucket(bucket)
blob = bucket.blob(simple_fname)
blob.download_to_file(tmpfile)
tmpfile.seek(0)
payload = tmpfile.read()
payload = bz2.decompress(payload)
return json.loads(payload.decode("utf8"))
except Exception:
logger.exception(f"Error reading from GCS gs://{bucket}/{simple_fname}")
def load_amo_curated_whitelist(bucket):
"""
Return the curated whitelist of addon GUIDs
"""
whitelist = read_from_gcs(
"only_guids_top_200.json", "addon_recommender", bucket,
)
return list(whitelist)
def store_json_to_gcs(
bucket, prefix, filename, json_obj, iso_date_str,
):
"""Saves the JSON data to a local file and then uploads it to GCS.
Two copies of the file will get uploaded: one with as "<base_filename>.json"
and the other as "<base_filename><YYYYMMDD>.json" for backup purposes.
:param bucket: The GCS bucket name.
:param prefix: The GCS prefix.
:param filename: A string with the base name of the file to use for saving
locally and uploading to GCS
:param json_data: A string with the JSON content to write.
:param date: A date string in the "YYYYMMDD" format.
"""
try:
byte_data = json.dumps(json_obj).encode("utf8")
byte_data = bz2.compress(byte_data)
logger.info(f"Compressed data is {len(byte_data)} bytes")
client = storage.Client()
bucket = client.get_bucket(bucket)
simple_fname = f"{prefix}/{filename}.bz2"
blob = bucket.blob(simple_fname)
blob.chunk_size = 5 * 1024 * 1024 # Set 5 MB blob size
print(f"Wrote out {simple_fname}")
blob.upload_from_string(byte_data)
long_fname = f"{prefix}/{filename}.{iso_date_str}.bz2"
blob = bucket.blob(long_fname)
blob.chunk_size = 5 * 1024 * 1024 # Set 5 MB blob size
print(f"Wrote out {long_fname}")
blob.upload_from_string(byte_data)
except Exception:
logger.exception(f"Error saving to GCS, Bucket: {bucket}, base object name: {prefix}/{filename}")
@click.command()
@click.option("--date", required=True)
@click.option("--bucket", default="taar_models")
@click.option("--prefix", default="taar/locale")
@click.option("--num_addons", default=10)
@click.option("--client_data_num_days", default=28)
def main(
date, bucket, prefix, num_addons, client_data_num_days,
):
logging.info("Starting taar-locale")
logging.info("Acquiring spark session")
spark = SparkSession.builder.appName("taar_locale").getOrCreate()
logger.info("Processing top N addons per locale")
locale_dict = generate_dictionary(
spark, num_addons, client_data_num_days, bucket
)
current_filename = "{}.json".format(LOCALE_FILE_NAME)
store_json_to_gcs(
bucket, prefix,
current_filename,
locale_dict,
date,
)
spark.stop()
if __name__ == "__main__":
main()