Remerge #157 for weighted randomization (#171)

* Unified patch for #157

* Update ETL job links

Update documentation and removed unnecessary env variables.
Split up some test cases
This commit is contained in:
Victor Ng 2020-07-06 14:19:18 -04:00 коммит произвёл GitHub
Родитель 37a3fb7bbb
Коммит b185583d42
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 282 добавлений и 79 удалений

Просмотреть файл

@ -51,9 +51,9 @@ This is the ordered list of the currently supported models:
| Order | Model | Description | Conditions | Generator job | | Order | Model | Description | Conditions | Generator job |
|-------|-------|-------------|------------|---------------| |-------|-------|-------------|------------|---------------|
| 1 | [Collaborative](taar/recommenders/collaborative_recommender.py) | recommends add-ons based on add-ons installed by other users (i.e. [collaborative filtering](https://en.wikipedia.org/wiki/Collaborative_filtering))|Telemetry data is available for the user and the user has at least one enabled add-on|[source](https://github.com/mozilla/telemetry-batch-view/blob/master/src/main/scala/com/mozilla/telemetry/ml/AddonRecommender.scala)| | 1 | [Collaborative](taar/recommenders/collaborative_recommender.py) | recommends add-ons based on add-ons installed by other users (i.e. [collaborative filtering](https://en.wikipedia.org/wiki/Collaborative_filtering))|Telemetry data is available for the user and the user has at least one enabled add-on|[source](https://github.com/mozilla/telemetry-batch-view/blob/master/src/main/scala/com/mozilla/telemetry/ml/AddonRecommender.scala)|
| 2 | [Similarity](taar/recommenders/similarity_recommender.py) | recommends add-ons based on add-ons installed by similar representative users|Telemetry data is available for the user and a suitable representative donor can be found|[source](https://github.com/mozilla/python_mozetl/blob/master/mozetl/taar/taar_similarity.py)| | 2 | [Similarity](taar/recommenders/similarity_recommender.py) | recommends add-ons based on add-ons installed by similar representative users|Telemetry data is available for the user and a suitable representative donor can be found|[source](https://github.com/mozilla/telemetry-airflow/blob/master/jobs/taar_similarity.py)|
| 3 | [Locale](taar/recommenders/locale_recommender.py) |recommends add-ons based on the top addons for the user's locale|Telemetry data is available for the user and the locale has enough users|[source](https://github.com/mozilla/python_mozetl/blob/master/mozetl/taar/taar_locale.py)| | 3 | [Locale](taar/recommenders/locale_recommender.py) |recommends add-ons based on the top addons for the user's locale|Telemetry data is available for the user and the locale has enough users|[source](https://github.com/mozilla/telemetry-airflow/blob/master/jobs/taar_locale.py|
| 4 | [Ensemble](taar/recommenders/ensemble_recommender.py) *|recommends add-ons based on the combined (by [stacked generalization](https://en.wikipedia.org/wiki/Ensemble_learning#Stacking)) recomendations of other available recommender modules.|More than one of the other Models are available to provide recommendations.|[source](https://github.com/mozilla/python_mozetl/blob/master/mozetl/taar/taar_ensemble.py)| | 4 | [Ensemble](taar/recommenders/ensemble_recommender.py) *|recommends add-ons based on the combined (by [stacked generalization](https://en.wikipedia.org/wiki/Ensemble_learning#Stacking)) recomendations of other available recommender modules.|More than one of the other Models are available to provide recommendations.|[source](https://github.com/mozilla/telemetry-airflow/blob/master/jobs/taar_ensemble.py|
All jobs are scheduled in Mozilla's instance of All jobs are scheduled in Mozilla's instance of
[Airflow](https://github.com/mozilla/telemetry-airflow). The [Airflow](https://github.com/mozilla/telemetry-airflow). The

16
docs/randomized_tails.md Normal file
Просмотреть файл

@ -0,0 +1,16 @@
# Randomized tail selection of addons
The `TAAR_EXPERIMENT_PROB` sets a probability that a user is in an experiment
to get randomized recommendations.
Randomized recommendations does not mean that recommendations are
fully randomized. Weights for each recommendation are normalized to
so that the sum of weights equals 1.0.
Using `numpy.random.choice` - we then select a non-uniform random
sample from the list of suggestions without replacement. Weights are
used to define a vector of probabilities.
By default - TAAR_EXPERIMENT_PROB is set to 0.0 which in effect
disables the randomization feature.

Просмотреть файл

@ -11,7 +11,6 @@ import json
import zlib import zlib
import datetime import datetime
BIGTABLE_PROJECT_ID = config( BIGTABLE_PROJECT_ID = config(
"BIGTABLE_PROJECT_ID", default="cfr-personalization-experiment" "BIGTABLE_PROJECT_ID", default="cfr-personalization-experiment"
) )

Просмотреть файл

@ -2,12 +2,14 @@ from .collaborative_recommender import CollaborativeRecommender
from .locale_recommender import LocaleRecommender from .locale_recommender import LocaleRecommender
from .similarity_recommender import SimilarityRecommender from .similarity_recommender import SimilarityRecommender
from .recommendation_manager import RecommendationManager, RecommenderFactory from .recommendation_manager import RecommendationManager, RecommenderFactory
from .fixtures import hasher # noqa
__all__ = [ __all__ = [
'CollaborativeRecommender', "CollaborativeRecommender",
'LocaleRecommender', "LocaleRecommender",
'SimilarityRecommender', "SimilarityRecommender",
'RecommendationManager', "RecommendationManager",
'RecommenderFactory', "RecommenderFactory",
"hasher",
] ]

Просмотреть файл

@ -7,9 +7,17 @@ import itertools
from .base_recommender import AbstractRecommender from .base_recommender import AbstractRecommender
from .lazys3 import LazyJSONLoader from .lazys3 import LazyJSONLoader
from .s3config import TAAR_WHITELIST_BUCKET
from .s3config import TAAR_WHITELIST_KEY
from .s3config import TAAR_ENSEMBLE_BUCKET from .s3config import TAAR_ENSEMBLE_BUCKET
from .s3config import TAAR_ENSEMBLE_KEY from .s3config import TAAR_ENSEMBLE_KEY
from .fixtures import hasher
def is_test_client(client_id):
return len(set(client_id.replace("-", ""))) == 1
class WeightCache: class WeightCache:
def __init__(self, ctx): def __init__(self, ctx):
@ -48,6 +56,10 @@ class EnsembleRecommender(AbstractRecommender):
for rkey in self.RECOMMENDER_KEYS: for rkey in self.RECOMMENDER_KEYS:
self._recommender_map[rkey] = recommender_factory.create(rkey) self._recommender_map[rkey] = recommender_factory.create(rkey)
self._whitelist_data = LazyJSONLoader(
self._ctx, TAAR_WHITELIST_BUCKET, TAAR_WHITELIST_KEY
)
self._weight_cache = WeightCache(self._ctx.child()) self._weight_cache = WeightCache(self._ctx.child())
self.logger.info("EnsembleRecommender initialized") self.logger.info("EnsembleRecommender initialized")
@ -64,18 +76,26 @@ class EnsembleRecommender(AbstractRecommender):
return result return result
def recommend(self, client_data, limit, extra_data={}): def recommend(self, client_data, limit, extra_data={}):
try: client_id = client_data.get("client_id", "no-client-id")
results = self._recommend(client_data, limit, extra_data)
except Exception as e:
results = []
self._weight_cache._weights.force_expiry()
self.logger.exception(
"Ensemble recommender crashed for {}".format(
client_data.get("client_id", "no-client-id")
),
e,
)
if is_test_client(client_id):
whitelist = self._whitelist_data.get()[0]
samples = whitelist[:limit]
self.logger.info("Test ID detected [{}]".format(client_id))
# Compute a stable weight for any whitelisted addon based
# on the sha256 hash of the GUID
p = [(int(hasher(s), 16) % 100) / 100.0 for s in samples]
results = list(zip(samples, p))
else:
try:
results = self._recommend(client_data, limit, extra_data)
except Exception as e:
results = []
self._weight_cache._weights.force_expiry()
self.logger.exception(
"Ensemble recommender crashed for {}".format(client_id), e
)
return results return results
def _recommend(self, client_data, limit, extra_data={}): def _recommend(self, client_data, limit, extra_data={}):
@ -120,7 +140,9 @@ class EnsembleRecommender(AbstractRecommender):
# group by the guid, sum up the weights for recurring GUID # group by the guid, sum up the weights for recurring GUID
# suggestions across all recommenders # suggestions across all recommenders
guid_grouper = itertools.groupby(flattened_results, lambda item: item[0]) guid_grouper = itertools.groupby(
flattened_results, lambda item: item[0]
)
ensemble_suggestions = [] ensemble_suggestions = []
for (guid, guid_group) in guid_grouper: for (guid, guid_group) in guid_grouper:
@ -141,10 +163,12 @@ class EnsembleRecommender(AbstractRecommender):
log_data = ( log_data = (
client_data["client_id"], client_data["client_id"],
extra_data.get("guid_randomization", False),
str(ensemble_weights), str(ensemble_weights),
str([r[0] for r in results]), str([r[0] for r in results]),
) )
self.logger.info( self.logger.info(
"client_id: [%s], ensemble_weight: [%s], guids: [%s]" % log_data "client_id: [%s], guid_randomization: [%s], ensemble_weight: [%s], guids: [%s]"
% log_data
) )
return results return results

Просмотреть файл

@ -0,0 +1,14 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
These are fixtures that are used for testing TAAR in a production
enviroment with known stable client_ids
"""
import hashlib
def hasher(client_id):
return hashlib.new("sha256", client_id.encode("utf8")).hexdigest()

Просмотреть файл

@ -58,6 +58,7 @@ class CuratedRecommender(AbstractRecommender):
def recommend(self, client_data, limit, extra_data={}): def recommend(self, client_data, limit, extra_data={}):
""" """
Curated recommendations are just random selections Curated recommendations are just random selections
from the whitelist and we explicitly set the weighting to 1.0
""" """
guids = self._curated_wl.get_randomized_guid_sample(limit) guids = self._curated_wl.get_randomized_guid_sample(limit)

Просмотреть файл

@ -0,0 +1,40 @@
"""
This module re-orders the (GUID, weight) 2-tuples using
numpy.random.choice
"""
import numpy as np
def in_experiment(client_id, xp_prob=0.5):
"""
Return whether or not this client_id is in the experiment.
xp_prob is a probability between 0.0 and 1.0 which is the
chance that the experimental branch is selected.
"""
hex_client = ''.join([c for c in client_id.lower() if c in 'abcdef0123456789'])
int_client = int(hex_client, 16)
return int((int_client % 100) <= (xp_prob * 100))
def reorder_guids(guid_weight_tuples, size=None):
"""
This reorders (GUID, weight) 2-tuples based on the weight using
random selection, without replacement.
@size denotes the length of the output.
"""
weight_list = [weight for (guid, weight) in guid_weight_tuples]
guids = [guid for (guid, weight) in guid_weight_tuples]
guid_map = dict(zip(guids, guid_weight_tuples))
if size is None:
size = len(guids)
# Normalize the weights so that they're probabilities
total_weight = sum(weight_list)
probabilities = [w * 1.0 / total_weight for w in weight_list]
choices = np.random.choice(guids, size=size, replace=False, p=probabilities)
return [guid_map[guid] for guid in choices]

Просмотреть файл

@ -3,42 +3,22 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/. # file, You can obtain one at http://mozilla.org/MPL/2.0/.
from taar.recommenders.ensemble_recommender import EnsembleRecommender from taar.recommenders.ensemble_recommender import EnsembleRecommender
from taar.recommenders.randomizer import in_experiment, reorder_guids
from srgutil.interfaces import IMozLogging from srgutil.interfaces import IMozLogging
from taar.context import default_context from taar.context import default_context
from .lazys3 import LazyJSONLoader from .lazys3 import LazyJSONLoader
import random
from .s3config import TAAR_WHITELIST_BUCKET from .s3config import TAAR_WHITELIST_BUCKET
from .s3config import TAAR_WHITELIST_KEY from .s3config import TAAR_WHITELIST_KEY
from .s3config import TAAR_EXPERIMENT_PROB
import hashlib
# We need to build a default logger for the schema validation as there # We need to build a default logger for the schema validation as there
# is no class to bind to yet. # is no class to bind to yet.
ctx = default_context() ctx = default_context()
def hasher(client_id):
return hashlib.new("sha256", client_id.encode("utf8")).hexdigest()
TEST_CLIENT_IDS = [
hasher("00000000-0000-0000-0000-000000000000"),
hasher("11111111-1111-1111-1111-111111111111"),
hasher("22222222-2222-2222-2222-222222222222"),
hasher("33333333-3333-3333-3333-333333333333"),
]
EMPTY_TEST_CLIENT_IDS = [
hasher("00000000-aaaa-0000-0000-000000000000"),
hasher("11111111-aaaa-1111-1111-111111111111"),
hasher("22222222-aaaa-2222-2222-222222222222"),
hasher("33333333-aaaa-3333-3333-333333333333"),
]
class RecommenderFactory: class RecommenderFactory:
""" """
A RecommenderFactory provides support to create recommenders. A RecommenderFactory provides support to create recommenders.
@ -82,6 +62,10 @@ class RecommendationManager:
self._ctx, TAAR_WHITELIST_BUCKET, TAAR_WHITELIST_KEY self._ctx, TAAR_WHITELIST_BUCKET, TAAR_WHITELIST_KEY
) )
self._experiment_prob = ctx.get(
"TAAR_EXPERIMENT_PROB", TAAR_EXPERIMENT_PROB
)
def recommend(self, client_id, limit, extra_data={}): def recommend(self, client_id, limit, extra_data={}):
"""Return recommendations for the given client. """Return recommendations for the given client.
@ -93,24 +77,29 @@ class RecommendationManager:
:param extra_data: a dictionary with extra client data. :param extra_data: a dictionary with extra client data.
""" """
if client_id in TEST_CLIENT_IDS: results = None
data = self._whitelist_data.get()[0]
random.shuffle(data)
samples = data[:limit]
self.logger.info("Test ID detected [{}]".format(client_id))
return [(s, 1.1) for s in samples]
if client_id in EMPTY_TEST_CLIENT_IDS:
self.logger.info("Empty Test ID detected [{}]".format(client_id))
return []
client_info = self.profile_fetcher.get(client_id) client_info = self.profile_fetcher.get(client_id)
if client_info is None: if client_info is None:
self.logger.info( self.logger.info(
"Defaulting to empty results. No client info fetched from storage backend." "Defaulting to empty results. No client info fetched from storage backend."
) )
return [] results = []
results = self._ensemble_recommender.recommend(client_info, limit, extra_data) if in_experiment(client_id, self._experiment_prob):
if results is None:
# Fetch back all possible whitelisted addons for this
# client
extra_data["guid_randomization"] = True
whitelist = self._whitelist_data.get()[0]
results = self._ensemble_recommender.recommend(
client_info, len(whitelist), extra_data
)
results = reorder_guids(results, limit)
else:
if results is None:
results = self._ensemble_recommender.recommend(
client_info, limit, extra_data
)
return results return results

Просмотреть файл

@ -28,3 +28,5 @@ TAAR_SIMILARITY_DONOR_KEY = config(
TAAR_SIMILARITY_LRCURVES_KEY = config( TAAR_SIMILARITY_LRCURVES_KEY = config(
"TAAR_SIMILARITY_LRCURVES_KEY", default="test_similarity_lrcurves_key" "TAAR_SIMILARITY_LRCURVES_KEY", default="test_similarity_lrcurves_key"
) )
TAAR_EXPERIMENT_PROB = config("TAAR_EXPERIMENT_PROB", default=0.0)

Просмотреть файл

@ -104,13 +104,24 @@ def test_hybrid_recommendations(test_ctx):
# of recommendations # of recommendations
assert len(guid_list) == LIMIT assert len(guid_list) == LIMIT
@mock_s3
def test_stable_hybrid_results(test_ctx):
# verify that the recommendations mix the curated and
# ensemble results
ctx = install_mock_curated_data(test_ctx)
ctx = install_ensemble_fixtures(ctx)
r = HybridRecommender(ctx)
# Test that the results are actually mixed # Test that the results are actually mixed
guid_list = r.recommend({"client_id": "000000"}, limit=4) guid_list = r.recommend({"client_id": "000000"}, limit=4)
# A mixed list will have two recommendations with weight > 1.0 assert len(guid_list) == 4
# (ensemble) and 2 with exactly weight 1.0 from the curated list
assert guid_list[0][1] > 1.0 # A mixed list will have two recommendations with weight = 1.0
assert guid_list[1][1] > 1.0 # (curated) and 2 with exactly weight < 1.0 from the ensemble list
assert guid_list[2][1] == 1.0
assert guid_list[3][1] == 1.0 assert guid_list[0][1] == 1.0
assert guid_list[1][1] == 1.0
assert guid_list[2][1] < 1.0
assert guid_list[3][1] < 1.0

61
tests/test_randomizer.py Normal file
Просмотреть файл

@ -0,0 +1,61 @@
"""
Test that we can reorder (GUID, weight) tuples based on random
selection based on probability,
"""
from taar.recommenders.randomizer import reorder_guids
from taar.recommenders.randomizer import in_experiment
import numpy as np
from collections import Counter
def most_frequent(List):
occurence_count = Counter(List)
return occurence_count.most_common(1)[0][0]
def test_reorder_guids():
# These weights are selected carefully so that they are different
# enough that a randomized selection using the weighted inputs
# will be stable 'enough' that we should be able to pass tests
# consistently over a sufficiently large sample
# Fix the random seed so that we get stable results between test
# runs
np.random.seed(seed=42)
guid_weight_tuples = [
("guid1", 0.01),
("guid2", 0.09),
("guid3", 0.30),
("guid4", 0.60),
]
# Run this 100 times to get the average ordering
results = []
for i in range(100):
results.append(reorder_guids(guid_weight_tuples))
best_result = []
for i in range(4):
best_result.append(most_frequent([row[i] for row in results])[0])
assert best_result == ["guid4", "guid3", "guid2", "guid1"]
def test_experimental_branch_guid():
"""
Test the experimental cutoff selection code.
The evaluation should be stable for a given probability and
client_id.
"""
for i in range(10, 100, 10):
id = hex(i)[2:]
cutoff = (i + 9.0) / 100
total = sum([in_experiment(id, cutoff) for i in range(100)])
assert total == 100
total = sum([in_experiment(id, cutoff - 0.1) for i in range(100)])
assert total == 0

Просмотреть файл

@ -6,8 +6,6 @@ import boto3
import json import json
from moto import mock_s3 from moto import mock_s3
from taar.recommenders import RecommendationManager from taar.recommenders import RecommendationManager
from taar.recommenders.recommendation_manager import TEST_CLIENT_IDS
from taar.recommenders.recommendation_manager import EMPTY_TEST_CLIENT_IDS
from taar.recommenders.base_recommender import AbstractRecommender from taar.recommenders.base_recommender import AbstractRecommender
from taar.recommenders.ensemble_recommender import ( from taar.recommenders.ensemble_recommender import (
@ -19,6 +17,9 @@ from taar.recommenders.ensemble_recommender import (
from .mocks import MockRecommenderFactory from .mocks import MockRecommenderFactory
from .test_hybrid_recommender import install_mock_curated_data from .test_hybrid_recommender import install_mock_curated_data
import operator
from functools import reduce
class StubRecommender(AbstractRecommender): class StubRecommender(AbstractRecommender):
""" A shared, stub recommender that can be used for testing. """ A shared, stub recommender that can be used for testing.
@ -35,23 +36,32 @@ class StubRecommender(AbstractRecommender):
return self._recommendations return self._recommendations
def install_mocks(ctx): def install_mocks(ctx, mock_fetcher=None):
ctx = ctx.child() ctx = ctx.child()
class MockProfileFetcher: class DefaultMockProfileFetcher:
def get(self, client_id): def get(self, client_id):
return {"client_id": client_id} return {"client_id": client_id}
ctx["profile_fetcher"] = MockProfileFetcher() if mock_fetcher is None:
mock_fetcher = DefaultMockProfileFetcher()
ctx["profile_fetcher"] = mock_fetcher
ctx["recommender_factory"] = MockRecommenderFactory() ctx["recommender_factory"] = MockRecommenderFactory()
DATA = { DATA = {
"ensemble_weights": {"collaborative": 1000, "similarity": 100, "locale": 10} "ensemble_weights": {
"collaborative": 1000,
"similarity": 100,
"locale": 10,
}
} }
conn = boto3.resource("s3", region_name="us-west-2") conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=TAAR_ENSEMBLE_BUCKET) conn.create_bucket(Bucket=TAAR_ENSEMBLE_BUCKET)
conn.Object(TAAR_ENSEMBLE_BUCKET, TAAR_ENSEMBLE_KEY).put(Body=json.dumps(DATA)) conn.Object(TAAR_ENSEMBLE_BUCKET, TAAR_ENSEMBLE_KEY).put(
Body=json.dumps(DATA)
)
return ctx return ctx
@ -63,6 +73,7 @@ def test_none_profile_returns_empty_list(test_ctx):
class MockProfileFetcher: class MockProfileFetcher:
def get(self, client_id): def get(self, client_id):
return None return None
ctx["profile_fetcher"] = MockProfileFetcher() ctx["profile_fetcher"] = MockProfileFetcher()
rec_manager = RecommendationManager(ctx) rec_manager = RecommendationManager(ctx)
@ -87,9 +98,7 @@ def test_simple_recommendation(test_ctx):
] ]
manager = RecommendationManager(ctx.child()) manager = RecommendationManager(ctx.child())
recommendation_list = manager.recommend( recommendation_list = manager.recommend("some_ignored_id", 10)
"some_ignored_id", 10
)
assert isinstance(recommendation_list, list) assert isinstance(recommendation_list, list)
assert recommendation_list == EXPECTED_RESULTS assert recommendation_list == EXPECTED_RESULTS
@ -101,21 +110,56 @@ def test_fixed_client_id_valid(test_ctx):
ctx = install_mock_curated_data(ctx) ctx = install_mock_curated_data(ctx)
manager = RecommendationManager(ctx.child()) manager = RecommendationManager(ctx.child())
recommendation_list = manager.recommend( recommendation_list = manager.recommend('111111', 10)
TEST_CLIENT_IDS[0], 10
)
assert len(recommendation_list) == 10 assert len(recommendation_list) == 10
@mock_s3 @mock_s3
def test_fixed_client_id_empty_list(test_ctx): def test_fixed_client_id_empty_list(test_ctx):
class NoClientFetcher:
def get(self, client_id):
return None
ctx = install_mocks(test_ctx, mock_fetcher=NoClientFetcher())
ctx = install_mock_curated_data(ctx)
manager = RecommendationManager(ctx.child())
recommendation_list = manager.recommend("not_a_real_client_id", 10)
assert len(recommendation_list) == 0
@mock_s3
def test_experimental_randomization(test_ctx):
ctx = install_mocks(test_ctx) ctx = install_mocks(test_ctx)
ctx = install_mock_curated_data(ctx) ctx = install_mock_curated_data(ctx)
manager = RecommendationManager(ctx.child()) manager = RecommendationManager(ctx.child())
recommendation_list = manager.recommend( raw_list = manager.recommend('111111', 10)
EMPTY_TEST_CLIENT_IDS[0], 10
)
assert len(recommendation_list) == 0 # Clobber the experiment probability to be 100% to force a
# reordering.
ctx["TAAR_EXPERIMENT_PROB"] = 1.0
manager = RecommendationManager(ctx.child())
rand_list = manager.recommend('111111', 10)
"""
The two lists should be :
* different (guid, weight) lists (possibly just order)
* same length
"""
assert (
reduce(
operator.and_,
[
(t1[0] == t2[0] and t1[1] == t2[1])
for t1, t2 in zip(rand_list, raw_list)
],
)
is False
)
assert len(rand_list) == len(raw_list)