Rewrite TAAR locale job to compute DP-protected frequency weights for all whitelisted add-ons

This commit is contained in:
Dave Zeber 2019-01-10 19:54:53 -06:00 коммит произвёл Anthony Miyaguchi
Родитель 491fbda515
Коммит 4af20a66ca
3 изменённых файлов: 739 добавлений и 258 удалений

Просмотреть файл

@ -12,9 +12,13 @@ import click
import json
import logging
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, rank, desc
from pyspark.sql.window import Window
from datetime import date, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pandas import DataFrame, IndexSlice
from numpy.random import laplace as rlaplace
from .taar_utils import store_json_to_s3
from .taar_utils import load_amo_curated_whitelist
@ -22,173 +26,308 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
LOCALE_FILE_NAME = "top10_dict"
# Set the max multiplicative DP difference in probabilities to
# exp(espilon) ~= 1.5.
EPSILON = 0.4
def get_addons(spark):
def get_client_addons(spark, start_date, end_date=None):
"""Returns a Spark DF listing add-ons by client_id and locale.
Only Firefox release clients are considered. The query finds each client's
most recent record in the `clients_daily` dataset over the given time period
and returns its installed add-ons. System add-ons, disabled add-ons, and
unsigned add-ons are filtered out.
:param start_date: the earliest submission date to include (yyyymmdd)
:param end_date, optional: the latest submission date to include (yyyymmdd)
:return: a DF with columns `locale`, `client_id`, `addon`
"""
Only Firefox release clients are considered.
Columns are exploded (over addon keys) to include locale of each addon
installation instance system addons, disabled addons, unsigned addons
are filtered out.
Sorting by addon-installations and grouped by locale.
Note that the final result of this job does not include firefox
telemetry client ID so we do not need to post-process the data in the
get_addons function.
addons_query_template = """
WITH sample AS (
SELECT
client_id,
submission_date_s3,
locale,
active_addons
FROM clients_daily
WHERE
app_name='Firefox'
AND channel='release'
AND submission_date_s3 >= '{start_date}'
{end_date_filter}
AND client_id IS NOT NULL
),
sample_dedup AS (
SELECT
client_id,
locale,
explode(active_addons) AS addon_info
FROM (
SELECT
*,
-- retain the most recent active day for each client
row_number() OVER (
PARTITION BY client_id
ORDER BY submission_date_s3 DESC
) AS idx
FROM sample
)
WHERE idx = 1
)
SELECT
locale,
client_id,
addon_info.addon_id as addon
FROM sample_dedup
WHERE
addon_info.blocklisted = FALSE -- not blocklisted
AND addon_info.type = 'extension' -- nice webextensions only
AND addon_info.signed_state = 2 -- fully reviewed addons only
AND addon_info.user_disabled = FALSE -- active addons only get counted
AND addon_info.app_disabled = FALSE -- exclude compatibility disabled addons
AND addon_info.is_system = FALSE -- exclude system addons
AND locale <> 'null'
AND addon_info.addon_id IS NOT NULL
"""
end_date_filter = (
"AND submission_date_s3 <= '{}'".format(end_date) if end_date else ""
)
addons_query = addons_query_template.format(
start_date=start_date, end_date_filter=end_date_filter
)
return spark.sql(addons_query)
def limit_client_addons(spark, client_addons_df, addon_limits, whitelist):
"""Limit the number of add-ons associated with a single client ID.
This is a part of the privacy protection mechanism applied to the raw data.
For each client in the dataset, we retain a randomly selected subset of
their add-ons which belong to the whitelist. The max number of add-ons may
differ by locale.
:param client_addons_df: a DF listing add-on IDs by client ID and locale, as
generated by `get_client_addons()`
:param addon_limits: a dict mapping locale strings to ints representing the
max number of add-ons retained per client in that locale.
Any locale not present in the dict is excluded from the
final dataset.
:param whitelist: a list of add-on IDs belonging to the AMO whitelist
:return: a DF containing a subset of the rows of `client_addons_df`
"""
# Convert the dict of limits to a Spark DF that can be joined in.
addon_limits_schema = StructType(
[
StructField("locale", StringType(), False),
StructField("client_max_addons", IntegerType(), False),
]
)
addon_limits_df = spark.createDataFrame(
addon_limits.items(), schema=addon_limits_schema
)
# Inner join means that data for any locale not listed in the dict gets dropped.
client_df = client_addons_df.join(addon_limits_df, on="locale", how="inner")
# client_df now has columns (locale, client_id, addon, client_max_addons).
# Restrict to whitelist add-ons.
client_df = client_df.where(col("addon").isin(whitelist))
client_df.createOrReplaceTempView("client_addons")
# Limit each client's add-ons to a random subset of max allowed size.
# Add-ons are shuffled by generating an auxiliary column of independent Uniform(0,1)
# random variables and sorting along this column within clients.
# Shuffling only needs to be done if the user has more than the max allowed
# add-ons (otherwise they will all be retained).
return spark.sql(
"""
WITH sample AS
(
SELECT
WITH addons AS (
-- add a convenience column listing the number of add-ons each client has
SELECT
*,
COUNT(client_id) OVER (PARTITION BY client_id) AS num_client_addons
FROM client_addons
),
shuffle_ord AS (
-- add the auxiliary sorting column
SELECT
*,
-- only need to shuffle if the client has too many add-ons
CASE WHEN num_client_addons > client_max_addons THEN RAND() ELSE NULL
END AS ord
FROM addons
)
SELECT
client_id,
locale AS locality,
EXPLODE(active_addons)
FROM
clients_daily
WHERE
channel='release' AND
app_name='Firefox'
),
filtered_sample AS (
SELECT
locality,
col.addon_id as addon_key
FROM
sample
WHERE
col.blocklisted = FALSE -- not blocklisted
AND col.type = 'extension' -- nice webextensions only
AND col.signed_state = 2 -- fully reviewed addons only
AND col.user_disabled = FALSE -- active addons only get counted
AND col.app_disabled = FALSE -- exclude compatibility disabled
AND col.is_system = FALSE -- exclude system addons
AND locality <> 'null'
AND col.addon_id is not null
),
country_addon_pairs AS (
SELECT
COUNT(*) AS pair_cnts, addon_key, locality
from filtered_sample
GROUP BY locality, addon_key
locale,
addon
FROM (
SELECT
*,
row_number() OVER (PARTITION BY client_id ORDER BY ord) AS idx
FROM shuffle_ord
)
SELECT
pair_cnts,
addon_key,
locality
FROM
country_addon_pairs
ORDER BY locality, pair_cnts DESC
"""
WHERE idx <= client_max_addons
"""
)
def compute_threshold(addon_df):
""" Get a threshold to remove locales with a small
number of addons installations.
def compute_noisy_counts(locale_addon_counts, addon_limits, whitelist, eps=EPSILON):
"""Apply DP protections to the raw per-locale add-on frequency counts.
Laplace noise is added to each of the counts. Additionally, each per-locale
set of frequency counts is expanded to include every add-on in the
whitelist, even if some were not observed in the raw data.
This computation is done in local memory, rather than in Spark, to simplify
working with random number generation. This relies on the assumption that
the number of unique locales and whitelist add-ons each remain small (on the
order of 100-1000).
:param locale_addon_counts: a Pandas DF of per-locale add-on frequency
counts, with columns `locale`, `addon`, `count`
:param addon_limits: a dict mapping locale strings to ints representing the
max number of add-ons retained per client in that locale.
Any locale not present in the dict is excluded from the
final dataset.
:param whitelist: a list of add-on IDs belonging to the AMO whitelist
:param eps: the DP epsilon parameter, representing the privacy budget
:return: a DF with the same structure as `locale_addon_counts`. Counts may
now be non-integer and negative.
"""
addon_install_counts = addon_df.groupBy("locality").agg({"pair_cnts": "sum"})
# Compute a threshold at the 25th percentile to remove locales with a
# small number of addons installations.
locale_pop_threshold = addon_install_counts.approxQuantile(
"sum(pair_cnts)", [0.25], 0.2
)[0]
# Safety net in case the distribution gets really skewed, we should
# require 2000 addon installation instances to make recommendations.
if locale_pop_threshold < 2000:
locale_pop_threshold = 2000
# Filter the list to only include locales including a sufficient
# number of addon installations. Include number of addons in locales
# that satisfy the threshold condition.
addon_locale_counts = addon_install_counts.filter(
(col("sum(pair_cnts)") >= locale_pop_threshold)
# First expand the frequency count table to include all whitelisted add-ons
# in each locale.
locale_wl_addons = DataFrame.from_records(
[(loc, a) for loc in addon_limits.keys() for a in whitelist],
columns=["locale", "addon"],
)
raw_counts = locale_addon_counts.set_index(["locale", "addon"])
locale_wl = locale_wl_addons.set_index(["locale", "addon"])
# Left join means the result will have every add-on in the whitelist and
# only the locales in the limits dict.
expanded_counts = locale_wl.join(raw_counts, how="left").fillna(0)
return addon_locale_counts
# Add the Laplace noise.
for locale in expanded_counts.index.unique("locale"):
# The scale parameter depends on the max number of add-ons per client,
# which varies by locale.
locale_laplace_param = float(addon_limits[locale]) / eps
# Select counts for all add-ons in the current locale.
locale_idx = IndexSlice[locale, :]
locale_counts = expanded_counts.loc[locale_idx, "count"]
locale_counts += rlaplace(scale=locale_laplace_param, size=len(locale_counts))
expanded_counts.loc[locale_idx, "count"] = locale_counts
return expanded_counts.reset_index()
def transform(addon_df, addon_locale_counts_df, num_addons):
""" Converts the locale-specific addon data in to a dictionary.
def get_addon_limits_by_locale(client_addons_df):
"""Determine the max number of add-ons per user in each locale.
:param addon_df: the locale-specific addon dataframe;
:param addon_locale_counts_df: total addon-installs per locale;
:param num_addons: requested number of recommendations.
:return: a dictionary {<locale>: [['GUID1', 1.0], ['GUID2', 0.9], ...]}
We allow for the possibility of basing this on a summary statistic computed
from the original data, in which case the limits will remain private.
:param client_addons_df: a DF listing add-on IDs by client ID and locale, as
generated by `get_client_addons()`
:return: a dict mapping locale strings to their add-on limits
"""
# Helper function to normalize addon installations in a lambda.
def normalize_cnts(p):
loc_norm = float(p["pair_cnts"]) / float(p["sum(pair_cnts)"])
return loc_norm
# Instantiate an empty dict.
top10_per = {}
# Join addon pair counts with total addon installs per locale.
# need to clone the DFs to workaround for SPARK bug#14948
# https://issues.apache.org/jira/browse/SPARK-14948
df1 = addon_locale_counts_df.alias("df1")
df2 = addon_df.alias("df2")
combined_df = addon_df.join(df1, df2.locality == df1.locality).drop(df1.locality)
# Normalize installation rate per locale.
normalized_installs = combined_df.rdd.map(
lambda p: Row(
addon_key=p["addon_key"], locality=p["locality"], loc_norm=normalize_cnts(p)
)
).toDF()
# Groupby locale and sort by normalized install rate
window = Window.partitionBy(normalized_installs["locality"]).orderBy(
desc("loc_norm")
)
# Truncate reults exceeding required number of addons.
truncated_df = (
normalized_installs.select("*", rank().over(window).alias("rank"))
.filter(col("rank") <= num_addons)
.drop(col("rank"))
)
list_of_locales = [
x[0] for x in truncated_df.select(truncated_df.locality).distinct().collect()
]
# There is probably a *much* smarter way fo doing this...
# but alas, I am le tired.
for specific_locale in list_of_locales:
# Most popular addons per locale sorted by normalized
# number of installs.
top10_per[specific_locale] = [
[x["addon_key"], x["loc_norm"]]
for x in truncated_df.filter(
truncated_df.locality == specific_locale
).collect()
]
return top10_per
# FIXME:
# For now, set add-on limits to 1 per client for each locale observed in
# the dataset.
all_locales = client_addons_df.select("locale").distinct().collect()
return {r["locale"]: 1 for r in all_locales}
def generate_dictionary(spark, num_addons):
""" Wrap the dictionary generation functions in an
easily testable way.
def get_protected_locale_addon_counts(spark, client_addons_df):
"""Compute DP-protected per-locale add-on frequency counts.
Privacy-preserving counts are generated using the Laplace mechanism,
restricting to add-ons in the AMO whitelist.
:param client_addons_df: a DF listing add-on IDs by client ID and locale, as
generated by `get_client_addons()`
:return: a Pandas DF with columns `locale`, `addon`, `count` containing
DP-protected counts for each whitelist add-on within each locale.
Unlike the true counts, weights may be non-integer or negative.
"""
# Execute spark.SQL query to get fresh addons from clients_daily.
addon_df = get_addons(spark)
# Load external whitelist based on AMO data.
amo_whitelist = load_amo_curated_whitelist()
# Filter to include only addons present in AMO whitelist.
addon_df_filtered = addon_df.where(col("addon_key").isin(amo_whitelist))
# Determine the max number of add-ons per user in each locale.
locale_addon_limits = get_addon_limits_by_locale(client_addons_df)
# Make sure not to include addons from very small locales.
addon_locale_counts_df = compute_threshold(addon_df_filtered)
return transform(addon_df_filtered, addon_locale_counts_df, num_addons)
# Limit the number of add-ons per client.
limited_addons_df = limit_client_addons(
spark, client_addons_df, locale_addon_limits, amo_whitelist
)
# Aggregate to a Pandas DF of locale/add-on frequency counts.
locale_addon_counts = (
limited_addons_df.groupBy("locale", "addon").count().toPandas()
)
# Add noise to the frequency counts.
noisy_addon_counts = compute_noisy_counts(
locale_addon_counts, locale_addon_limits, amo_whitelist, EPSILON
)
return noisy_addon_counts
def get_top_addons_by_locale(addon_counts, num_addons):
"""Generate a dictionary of top-weighted add-ons by locale.
Raw counts are normalized by converting to relative proportions.
:param addon_counts: a Pandas DF of per-locale add-on counts, with columns
`locale`, `addon`, `count`.
:param num_addons: requested number of recommendations.
:return: a dictionary `{<locale>: [('GUID1', 0.4), ('GUID2', 0.25), ...]}`
"""
top_addons = {}
addons_by_locale = addon_counts.set_index(["locale", "addon"])
for locale in addons_by_locale.index.unique("locale"):
# For each locale, work with a Series of counts indexed by add-on.
counts = addons_by_locale.loc[locale, "count"]
# Shift counts so as to align the smallest count with 0 in each locale.
# Since DP-protected counts can be negative, this is done to avoid
# problems computing the sum.
shifted_counts = counts - counts.min()
rel_counts = shifted_counts / shifted_counts.sum()
top_rel_counts = rel_counts.sort_values(ascending=False).head(num_addons)
top_addons[locale] = list(top_rel_counts.iteritems())
return top_addons
def generate_dictionary(spark, num_addons, dataset_num_days):
"""Compile lists of top add-ons by locale from per-client add-on data.
Runs a fresh data pull against `clients_daily`, computes DP-protected
frequency counts, and generates a weighted list of top add-ons by locale.
:param num_addons: number of add-on recommendations to report for each locale
:param dataset_num_days: number of days the raw data should cover
:return: a dictionary `{<locale>: [('GUID1', 0.4), ('GUID2', 0.25), ...]}`
as returned by `get_top_addons_by_locale()`
"""
# Execute spark.SQL query to get fresh addons from clients_daily.
# Add an extra day to the date range since the latest date in the dataset
# tends to lag 1 day behind.
earliest_date = date.today() - timedelta(days=dataset_num_days + 1)
earliest_date_fmt = earliest_date.strftime("%Y%m%d")
addon_df = get_client_addons(spark, earliest_date_fmt)
# Compute DP-protected frequency-based counts for each add-on by locale.
addon_df_protected = get_protected_locale_addon_counts(spark, addon_df)
# Find the top add-ons in each locale based on these counts.
top_addons = get_top_addons_by_locale(addon_df_protected, num_addons)
return top_addons
@click.command()
@ -196,13 +335,14 @@ def generate_dictionary(spark, num_addons):
@click.option("--bucket", default="telemetry-private-analysis-2")
@click.option("--prefix", default="taar/locale/")
@click.option("--num_addons", default=10)
def main(date, bucket, prefix, num_addons):
@click.option("--client_data_num_days", default=28)
def main(date, bucket, prefix, num_addons, client_data_num_days):
spark = (
SparkSession.builder.appName("taar_locale").enableHiveSupport().getOrCreate()
)
logger.info("Processing top N addons per locale")
locale_dict = generate_dictionary(spark, num_addons)
locale_dict = generate_dictionary(spark, num_addons, client_data_num_days)
store_json_to_s3(
json.dumps(locale_dict, indent=2), LOCALE_FILE_NAME, date, prefix, bucket
)

Просмотреть файл

@ -29,6 +29,7 @@ setup(
'click==6.7',
'click_datetime==0.2',
'numpy==1.13.3',
'pandas>=0.23.0',
'pyspark==2.3.1',
'pyspark_hyperloglog==2.1.1',
'python_moztelemetry==0.10.2',

Просмотреть файл

@ -1,27 +1,28 @@
"""Test suite for TAAR Locale Job."""
import boto3
import json
import functools
import pytest
from moto import mock_s3
from mozetl.taar import taar_locale, taar_utils
from datetime import date, datetime
from numpy import repeat as np_repeat
from pandas import DataFrame
from mozetl.taar import taar_locale
from pyspark.sql.types import (
StructField,
StructType,
StringType,
LongType,
IntegerType,
BooleanType,
ArrayType,
)
from pyspark.sql.functions import substring_index
clientsdaily_schema = StructType(
CLIENTS_DAILY_SCHEMA = StructType(
[
StructField("client_id", StringType(), True),
StructField("submission_date_s3", StringType(), True),
StructField("channel", StringType(), True),
StructField("city", StringType(), True),
StructField("subsession_hours_sum", LongType(), True),
StructField("os", StringType(), True),
StructField("app_name", StringType(), True),
StructField("locale", StringType(), True),
StructField(
@ -35,142 +36,481 @@ clientsdaily_schema = StructType(
StructField("app_disabled", BooleanType(), True),
StructField("blocklisted", BooleanType(), True),
StructField("foreign_install", BooleanType(), True),
StructField("has_binary_components", BooleanType(), True),
StructField("install_day", LongType(), True),
StructField("is_system", BooleanType(), True),
StructField("is_web_extension", BooleanType(), True),
StructField("multiprocess_compatible", BooleanType(), True),
StructField("name", StringType(), True),
StructField("scope", LongType(), True),
StructField("signed_state", LongType(), True),
StructField("type", StringType(), True),
StructField("update_day", LongType(), True),
StructField("user_disabled", BooleanType(), True),
StructField("version", StringType(), True),
]
),
True,
),
),
StructField("places_bookmarks_count_mean", LongType(), True),
StructField(
"scalar_parent_browser_engagement_tab_open_event_count_sum",
LongType(),
True,
),
StructField(
"scalar_parent_browser_engagement_total_uri_count_sum", LongType(), True
),
StructField(
"scalar_parent_browser_engagement_unique_domains_count_mean",
LongType(),
True,
),
]
)
default_sample = {
"client_id": "client-id",
# Templates for a single `active_addons` entry.
ADDON_TEMPLATES = {
"good": {
"addon_id": "<guid>",
"app_disabled": False,
"blocklisted": False,
"foreign_install": False,
"is_system": False,
"signed_state": 2,
"type": "extension",
"user_disabled": False,
},
"bad1": {
"addon_id": "<guid>",
"app_disabled": False,
"blocklisted": False,
"foreign_install": False,
"is_system": True,
"signed_state": 2,
"type": "extension",
"user_disabled": False,
},
"bad2": {
"addon_id": "<guid>",
"app_disabled": False,
"blocklisted": False,
"foreign_install": False,
"is_system": False,
"signed_state": 0,
"type": "extension",
"user_disabled": True,
},
}
# Template for a clients_daily row.
ROW_TEMPLATE = {
"client_id": "<client_id>",
"submission_date_s3": "<yyyymmdd>",
"channel": "release",
"app_name": "Firefox",
"locale": "en-US",
"active_addons": [
{
"addon_id": "test-guid-0001",
"app_disabled": False,
"blocklisted": False,
"foreign_install": False,
"is_system": False,
"signed_state": 2,
"type": "extension",
"user_disabled": False,
},
{
"addon_id": "non-whitelisted-addon",
"app_disabled": False,
"blocklisted": False,
"foreign_install": False,
"is_system": False,
"signed_state": 2,
"type": "extension",
"user_disabled": False,
},
],
"active_addons": "<addons_list>",
}
FAKE_AMO_DUMP = {
"test-guid-0001": {
"name": {"en-US": "test-amo-entry-1"},
"default_locale": "en-US",
"current_version": {
"files": [
{
"status": "public",
"platform": "all",
"id": 1,
"is_webextension": True,
}
]
},
"guid": "test-guid-0001",
# Condensed client data.
# Generate snippets using `generate_rows_for_client()`.
SAMPLE_CLIENT_DATA = {
"client-1": {
"20190115": ["guid-1", "guid-5", "guid-bad1"],
"20190113": ["guid-1", "guid-4", "guid-bad1"],
"20190112": ["guid-1", "guid-3", "guid-bad1"],
"20190110": ["guid-1", "guid-bad1"],
},
"test-guid-0002": {
"name": {"en-US": "test-amo-entry-2"},
"default_locale": "en-US",
"current_version": {
"files": [
{
"status": "public",
"platform": "all",
"id": 2,
"is_webextension": False,
}
]
},
"guid": "test-guid-0002",
"client-2": {
"20190114": ["guid-2", "guid-bad2"],
"20190112": ["guid-1", "guid-bad2"],
},
"client-3": {"20190109": ["guid-1"]},
"client-4": {"20190112": ["guid-1", "guid-2"]},
"client-5": {"20190114": [], "20190113": []},
"client-6": {"20190114": ["guid-1"], "20190112": ["guid-1"]},
"client-7": {
"20190114": [
"guid-1",
"guid-2",
"guid-3",
"guid-4",
"guid-5",
"guid-not-whitelisted",
]
},
}
# Per-locale add-on count records.
SAMPLE_ADDON_COUNTS = [
("en-US", "guid-1", 5),
("en-US", "guid-2", 2),
("en-US", "guid-3", 1),
("de", "guid-1", 3),
("de", "guid-2", 2),
("de", "guid-3", 4),
]
@pytest.fixture()
# Boundary dates to use when querying the mocked `clients_daily`.
DATE_RANGE = {"start": "20190112", "end": "20190114"}
LOCALE_LIMITS = {"en-US": 1, "de": 3, "pl": 7}
AMO_WHITELIST = ["guid-1", "guid-2", "guid-3", "guid-4", "guid-5", "guid-not-installed"]
def generate_addon_entry(guid):
"""Generate an `active_addons` entry with the given GUID.
The desired type ('good'/'badN') can optionally be tagged on to the GUID
as a suffix delimited by '-'.
"""
guid_suffix = guid.split("-")[-1]
addon_type = guid_suffix if guid_suffix.startswith("bad") else "good"
entry = dict(ADDON_TEMPLATES[addon_type])
entry["addon_id"] = guid
return entry
def generate_rows_for_client(client_id, client_data):
"""Generate dataframe_factory snippets with the given client ID from the
condensed format
{
'<date>': [ <addon1>, ... ],
...
},
where <addon> will be passed to `generate_addon_entry()`. Ordering of the
rows is not guaranteed.
"""
snippets = []
for subm_date, addons in client_data.items():
snippets.append(
{
"client_id": client_id,
"submission_date_s3": subm_date,
"active_addons": [generate_addon_entry(a) for a in addons],
}
)
return snippets
@pytest.fixture
def generate_data(dataframe_factory):
return functools.partial(
dataframe_factory.create_dataframe,
base=default_sample,
schema=clientsdaily_schema,
base=ROW_TEMPLATE,
schema=CLIENTS_DAILY_SCHEMA,
)
@pytest.fixture
def multi_locales_df(generate_data):
LOCALE_COUNTS = {"en-US": 50, "en-GB": 60, "it-IT": 2500}
sample_snippets = []
counter = 0
for locale, count in LOCALE_COUNTS.items():
for i in range(count):
variation = {"locale": locale, "client_id": "client-{}".format(counter)}
sample_snippets.append(variation)
counter = counter + 1
return generate_data(sample_snippets)
def locale_limits():
return dict(LOCALE_LIMITS)
@mock_s3
def test_generate_dictionary(spark, multi_locales_df):
conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=taar_utils.AMO_DUMP_BUCKET)
@pytest.fixture
def whitelist():
return list(AMO_WHITELIST)
# Store the data in the mocked bucket.
conn.Object(
taar_utils.AMO_DUMP_BUCKET, key=taar_utils.AMO_CURATED_WHITELIST_KEY
).put(Body=json.dumps(FAKE_AMO_DUMP))
multi_locales_df.createOrReplaceTempView("clients_daily")
@pytest.fixture
def short_locale_limits(locale_limits):
"""A condensed version of the locale limits."""
del locale_limits["pl"]
return locale_limits
# The "en-US" locale must not be reported: we set it to a low
# frequency on |multi_locale_df|.
expected = {"it-IT": [["test-guid-0001", 1.0]]}
actual = taar_locale.generate_dictionary(spark, 5)
@pytest.fixture
def short_whitelist(whitelist):
"""A condensed version of the whitelist."""
for x in ["guid-4", "guid-5"]:
whitelist.remove(x)
return whitelist
@pytest.fixture
def addon_counts():
return DataFrame.from_records(
SAMPLE_ADDON_COUNTS, columns=["locale", "addon", "count"]
)
@pytest.fixture
def addon_count_neg(addon_counts):
"""Add-on counts with a negative count, as could occur from adding noise."""
replace_row = (addon_counts["locale"] == "de") & (addon_counts["addon"] == "guid-1")
addon_counts.loc[replace_row, "count"] = -1
return addon_counts
@pytest.fixture
def client_addons_df(generate_data, locale_limits):
"""Returns a mock `clients_daily` Spark DF."""
snippets = []
for locale, maxn in locale_limits.items():
# Copy all the clients for each locale, tagging the client ID with
# the locale.
for cid, cdata in SAMPLE_CLIENT_DATA.items():
tagged_cid = "{}_{}".format(locale, cid)
client_snippets = generate_rows_for_client(tagged_cid, cdata)
for s in client_snippets:
s["locale"] = locale
snippets.extend(client_snippets)
# Add a dummy locale that should get dropped in processing.
client_snippets = generate_rows_for_client(
"client_fr", SAMPLE_CLIENT_DATA["client-4"]
)
for s in client_snippets:
s["locale"] = "fr"
snippets.extend(client_snippets)
df = generate_data(snippets)
df.createOrReplaceTempView("clients_daily")
return df
@pytest.fixture
def short_client_df(client_addons_df):
"""A condensed version of the `clients_daily` data."""
df = client_addons_df.where("locale in ('en-US', 'de')")
df.createOrReplaceTempView("clients_daily")
return df
@pytest.fixture
def multi_day_client_df(client_addons_df):
"""A single-locale version of the `clients_daily` data."""
df = (
client_addons_df.where("locale = 'en-US'")
.withColumn("client_id", substring_index("client_id", "_", -1))
.where(substring_index("client_id", "-", -1).isin([1, 2, 3, 4, 5]))
)
df.createOrReplaceTempView("clients_daily")
return df
@pytest.fixture
def mock_sql_rand(spark):
"""Patch Spark SQL `RAND` to return a constant value.
This will ensure deterministic selection of the add-on subsets, assuming
that sorting maintains the ordering of `active_addons`.
"""
def mock_rand():
return 1
spark.udf.register("rand", mock_rand, IntegerType())
yield
# Afterwards, reset all temporary catalog items to undo the patching.
spark.catalog._reset()
@pytest.fixture()
def mock_rlaplace(monkeypatch):
"""Patch the Laplace noise generation to make results deterministic."""
def mock_noise(scale, size):
"""Return `size` copies of the `scale` parameter."""
return np_repeat(scale, size)
monkeypatch.setattr(taar_locale, "rlaplace", mock_noise)
@pytest.fixture()
def mock_amo_whitelist(monkeypatch, short_whitelist):
"""Patch the whitelist loader to just use the one defined here."""
monkeypatch.setattr(
taar_locale, "load_amo_curated_whitelist", lambda: short_whitelist
)
@pytest.fixture()
def mock_today(monkeypatch):
"""Patch `date.today()` to 2019-01-15, the latest date in the dataset."""
class MockDate:
@classmethod
def today(cls):
return date(2019, 1, 15)
monkeypatch.setattr(taar_locale, "date", MockDate)
def spark_df_to_records(df):
"""Dump a Spark DF as a list of tuples representing rows ('records')."""
return [tuple(r) for r in df.collect()]
def pandas_df_to_records(df):
"""Convert a Pandas DF to a list of tuples representing rows ('records')."""
return df.to_records(index=False).tolist()
def same_rows(rows_list_1, rows_list_2):
"""Compare DF rows represented as lists of tuples ('records').
Checks that the lists contain the same tuples (possibly in different orders).
"""
return sorted(rows_list_1) == sorted(rows_list_2)
# =========================================================================== #
def test_get_client_addons(spark, multi_day_client_df):
# Test date range limited on both ends.
client_df = taar_locale.get_client_addons(
spark, DATE_RANGE["start"], DATE_RANGE["end"]
)
actual = spark_df_to_records(client_df)
expected = [
# client-1 retains the 20190113 record, dropping the bad add-on
("en-US", "client-1", "guid-1"),
("en-US", "client-1", "guid-4"),
# client-2 retains the 20190114 record, dropping the bad add-on
("en-US", "client-2", "guid-2"),
# client-3 is dropped (outside date range)
# client-4 retains the 20190112 record
("en-US", "client-4", "guid-1"),
("en-US", "client-4", "guid-2")
# client-5 is dropped (no add-ons)
]
assert same_rows(actual, expected)
# Test date range limited by earliest only.
client_df_no_end = taar_locale.get_client_addons(spark, DATE_RANGE["start"])
actual = spark_df_to_records(client_df_no_end)
expected = [
# client-1 retains the 20190115 record, dropping the bad add-on
("en-US", "client-1", "guid-1"),
("en-US", "client-1", "guid-5"),
# client-2 retains the 20190114 record, dropping the bad add-on
("en-US", "client-2", "guid-2"),
# client-3 is dropped (outside date range)
# client-4 retains the 20190112 record
("en-US", "client-4", "guid-1"),
("en-US", "client-4", "guid-2")
# client-5 is dropped (no add-ons)
]
assert same_rows(actual, expected)
def test_limit_client_addons(
spark, client_addons_df, locale_limits, whitelist, mock_sql_rand
):
client_df = taar_locale.get_client_addons(spark, DATE_RANGE["start"])
limited_df = taar_locale.limit_client_addons(
spark, client_df, locale_limits, whitelist
)
actual = spark_df_to_records(limited_df)
expected = [
# en-US clients keep 1 add-on
# client-1 retains the 20190115 record
("en-US_client-1", "en-US", "guid-1"),
# client-2 retains the 20190114 record
("en-US_client-2", "en-US", "guid-2"),
# client-4 retains the 20190112 record
("en-US_client-4", "en-US", "guid-1"),
# client-6 retains the 20190114 record
("en-US_client-6", "en-US", "guid-1"),
# client-7 retains the 20190114 record
("en-US_client-7", "en-US", "guid-1"),
# de clients keep 3 add-ons
# client-1 retains the 20190115 record
("de_client-1", "de", "guid-1"),
("de_client-1", "de", "guid-5"),
# client-2 retains the 20190114 record
("de_client-2", "de", "guid-2"),
# client-4 retains the 20190112 record
("de_client-4", "de", "guid-1"),
("de_client-4", "de", "guid-2"),
# client-6 retains the 20190114 record
("de_client-6", "de", "guid-1"),
# client-7 retains the 20190114 record
("de_client-7", "de", "guid-1"),
("de_client-7", "de", "guid-2"),
("de_client-7", "de", "guid-3"),
# pl clients keep 7 add-ons
# client-1 retains the 20190115 record
("pl_client-1", "pl", "guid-1"),
("pl_client-1", "pl", "guid-5"),
# client-2 retains the 20190114 record
("pl_client-2", "pl", "guid-2"),
# client-4 retains the 20190112 record
("pl_client-4", "pl", "guid-1"),
("pl_client-4", "pl", "guid-2"),
# client-6 retains the 20190114 record
("pl_client-6", "pl", "guid-1"),
# client-7 retains the 20190114 record,
# dropping the non-whitelisted add-on
("pl_client-7", "pl", "guid-1"),
("pl_client-7", "pl", "guid-2"),
("pl_client-7", "pl", "guid-3"),
("pl_client-7", "pl", "guid-4"),
("pl_client-7", "pl", "guid-5")
# fr locale is dropped because it is not in the locale dict
]
assert same_rows(actual, expected)
def test_compute_noisy_counts(
addon_counts, short_locale_limits, short_whitelist, mock_rlaplace
):
noisy_df = taar_locale.compute_noisy_counts(
addon_counts, short_locale_limits, short_whitelist
)
actual = pandas_df_to_records(noisy_df)
expected = [
# The mock noisification adds the Laplace scale parameter
# to the count directly.
# The function is using taar_locale.EPSILON = 0.4.
# For en-US, it is 1.0 / 0.4 = 2.5.
("en-US", "guid-1", 7.5),
("en-US", "guid-2", 4.5),
("en-US", "guid-3", 3.5),
# The whitelist add-on which was not installed in any profile
# gets added to the raw count DF with a count of 0.
("en-US", "guid-not-installed", 2.5),
# For de, the scale parameter is 3.0 / 0.4 = 7.5.
("de", "guid-1", 10.5),
("de", "guid-2", 9.5),
("de", "guid-3", 11.5),
("de", "guid-not-installed", 7.5),
]
assert same_rows(actual, expected)
def test_get_protected_and_dictionary(
spark, short_client_df, mock_sql_rand, mock_rlaplace, mock_amo_whitelist, mock_today
):
client_df = taar_locale.get_client_addons(spark, DATE_RANGE["start"])
noisy_df = taar_locale.get_protected_locale_addon_counts(spark, client_df)
actual = pandas_df_to_records(noisy_df)
expected = [
# The mock noisification adds the Laplace scale parameter
# to the count directly.
# The function is using taar_locale.EPSILON = 0.4 and setting each
# locale limit to 1.
# The scale parameters are 1.0 / 0.4 = 2.5.
("en-US", "guid-1", 6.5),
("en-US", "guid-2", 3.5),
# The next two add-ons are not installed in the sample profiles,
# so they have a raw count of 0.
("en-US", "guid-3", 2.5),
("en-US", "guid-not-installed", 2.5),
("de", "guid-1", 6.5),
("de", "guid-2", 3.5),
("de", "guid-3", 2.5),
("de", "guid-not-installed", 2.5),
]
assert same_rows(actual, expected)
# To test `generate_dictionary()`, we need to mock `date.today()`.
# Make sure that the `dataset_num_days` arg works out to give us
# `DATE_RANGE["start"] as the earliest date.
start_date = datetime.strptime(DATE_RANGE["start"], "%Y%m%d").date()
num_days = (taar_locale.date.today() - start_date).days
# `generate_dictionary()` adds 1 to the number of days.
num_days -= 1
actual = taar_locale.generate_dictionary(spark, 3, num_days)
# Noisy counts will be the same as listed in `expected` above.
# To generate the dictionary, the min value (2.5) is subtracted in each locale,
# counts are converted to relative proportions, and the list is ordered by
# decreasing value.
expected = {
"en-US": [("guid-1", 0.8), ("guid-2", 0.2), ("guid-not-installed", 0.0)],
"de": [("guid-1", 0.8), ("guid-2", 0.2), ("guid-not-installed", 0.0)],
}
assert actual == expected
def test_get_top_addons_by_locale(addon_count_neg):
actual = taar_locale.get_top_addons_by_locale(addon_count_neg, 3)
expected = {
"en-US": [("guid-1", 0.8), ("guid-2", 0.2), ("guid-3", 0.0)],
"de": [("guid-3", 0.625), ("guid-2", 0.375), ("guid-1", 0.0)],
}
assert actual == expected