Converted TAAR Collaborative recommender to use redis

This commit is contained in:
Victor Ng 2020-09-01 12:08:07 -04:00
Родитель 1c0d686f32
Коммит daab43c980
5 изменённых файлов: 228 добавлений и 204 удалений

Просмотреть файл

@ -3,38 +3,18 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from srgutil.interfaces import IMozLogging
from .lazys3 import LazyJSONLoader
import numpy as np
import operator as op
import functools
import threading
from .base_recommender import AbstractRecommender
from taar.settings import (
TAAR_ITEM_MATRIX_BUCKET,
TAAR_ITEM_MATRIX_KEY,
TAAR_ADDON_MAPPING_BUCKET,
TAAR_ADDON_MAPPING_KEY,
)
from taar.recommenders.redis_cache import AddonsCoinstallCache
import markus
metrics = markus.get_metrics("taar")
def synchronized(wrapped):
""" Synchronization decorator. """
@functools.wraps(wrapped)
def wrapper(*args, **kwargs):
self = args[0]
with self._lock:
return wrapped(*args, **kwargs)
return wrapper
def java_string_hashcode(s):
h = 0
for c in s:
@ -58,31 +38,20 @@ class CollaborativeRecommender(AbstractRecommender):
def __init__(self, ctx):
self._ctx = ctx
self._lock = threading.RLock()
self._addon_mapping = LazyJSONLoader(
self._ctx,
TAAR_ADDON_MAPPING_BUCKET,
TAAR_ADDON_MAPPING_KEY,
"addon_mapping",
)
self._raw_item_matrix = LazyJSONLoader(
self._ctx, TAAR_ITEM_MATRIX_BUCKET, TAAR_ITEM_MATRIX_KEY, "item_matrix",
)
self.logger = self._ctx[IMozLogging].get_logger("taar")
self._redis_cache = AddonsCoinstallCache(self._ctx)
self.model = None
@property
def addon_mapping(self):
return self._addon_mapping.get()[0]
return self._redis_cache.collab_addon_mapping()
@property
def raw_item_matrix(self):
val, new_copy = self._raw_item_matrix.get()
if val is not None and new_copy:
val = self._redis_cache.collab_raw_item_matrix()
if val not in (None, ""):
# Build a dense numpy matrix out of it.
num_rows = len(val)
num_cols = len(val[0]["features"])
@ -90,27 +59,10 @@ class CollaborativeRecommender(AbstractRecommender):
self.model = np.zeros(shape=(num_rows, num_cols))
for index, row in enumerate(val):
self.model[index, :] = row["features"]
elif val is None and new_copy:
else:
self.model = None
return val
def _load_json_models(self):
# Download the addon mappings.
if self.addon_mapping is None:
self.logger.error(
"Cannot download the addon mapping file {} {}".format(
TAAR_ADDON_MAPPING_BUCKET, TAAR_ADDON_MAPPING_KEY
)
)
if self.addon_mapping is None:
self.logger.error(
"Cannot download the model file {} {}".format(
TAAR_ITEM_MATRIX_BUCKET, TAAR_ITEM_MATRIX_KEY
)
)
@synchronized
def can_recommend(self, client_data, extra_data={}):
# We can't recommend if we don't have our data files.
if (
@ -178,22 +130,18 @@ class CollaborativeRecommender(AbstractRecommender):
@metrics.timer_decorator("collaborative_recommend")
def recommend(self, client_data, limit, extra_data={}):
# Addons identifiers are stored as positive hash values within the model.
with self._lock:
try:
recommendations = self._recommend(client_data, limit, extra_data)
except Exception as e:
recommendations = []
try:
recommendations = self._recommend(client_data, limit, extra_data)
except Exception as e:
recommendations = []
self._addon_mapping.force_expiry()
self._raw_item_matrix.force_expiry()
metrics.incr("error_collaborative", value=1)
self.logger.exception(
"Collaborative recommender crashed for {}".format(
client_data.get("client_id", "no-client-id")
),
e,
)
metrics.incr("error_collaborative", value=1)
self.logger.exception(
"Collaborative recommender crashed for {}".format(
client_data.get("client_id", "no-client-id")
),
e,
)
log_data = (
client_data["client_id"],

Просмотреть файл

@ -24,7 +24,14 @@ from taar.settings import (
)
# TAARLite configuration
from taar.settings import TAAR_LOCALE_BUCKET, TAAR_LOCALE_KEY
from taar.settings import (
TAAR_LOCALE_BUCKET,
TAAR_LOCALE_KEY,
TAAR_ADDON_MAPPING_BUCKET,
TAAR_ADDON_MAPPING_KEY,
TAAR_ITEM_MATRIX_BUCKET,
TAAR_ITEM_MATRIX_KEY,
)
from jsoncache.loader import s3_json_loader
@ -64,6 +71,10 @@ NORMDATA_GUID_ROW_NORM_PREFIX = "normdata_guid_row_norm_prefix|"
# TAAR: Locale data
LOCALE_DATA = "taar_locale_data|"
# TAAR: collaborative data
COLLAB_MAPPING_DATA = "taar_collab_mapping|"
COLLAB_ITEM_MATRIX = "taar_collab_item_matrix|"
class PrefixStripper:
def __init__(self, prefix, iterator, cast_to_str=False):
@ -239,11 +250,32 @@ class AddonsCoinstallCache:
return self._r0.get(ACTIVE_DB) is not None
def top_addons_per_locale(self):
"""
Get locale data
"""
tmp = self._db().get(LOCALE_DATA)
if tmp:
return json.loads(tmp.decode("utf8"))
return None
def collab_raw_item_matrix(self):
"""
Get the taar collaborative item matrix
"""
tmp = self._db().get(COLLAB_ITEM_MATRIX)
if tmp:
return json.loads(tmp.decode("utf8"))
return None
def collab_addon_mapping(self):
"""
Get the taar collaborative addon mappin
"""
tmp = self._db().get(COLLAB_MAPPING_DATA)
if tmp:
return json.loads(tmp.decode("utf8"))
return None
"""
################################
@ -281,6 +313,25 @@ class AddonsCoinstallCache:
def _fetch_locale_data(self):
return s3_json_loader(TAAR_LOCALE_BUCKET, TAAR_LOCALE_KEY)
def _fetch_collaborative_mapping_data(self):
return s3_json_loader(TAAR_ADDON_MAPPING_BUCKET, TAAR_ADDON_MAPPING_KEY)
def _fetch_collaborative_item_matrix(self):
return s3_json_loader(TAAR_ITEM_MATRIX_BUCKET, TAAR_ITEM_MATRIX_KEY)
def _update_collab_data(self, db):
"""
Load the TAAR collaborative data. This is two parts: an item
matrix and a mapping of GUIDs
"""
# Load the item matrix into redis
item_matrix = self._fetch_collaborative_item_matrix()
db.set(COLLAB_ITEM_MATRIX, json.dumps(item_matrix))
# Load the taar collaborative mapping data
mapping_data = self._fetch_collaborative_mapping_data()
db.set(COLLAB_MAPPING_DATA, json.dumps(mapping_data))
def _update_locale_data(self, db):
"""
Load the TAAR locale data
@ -395,5 +446,8 @@ class AddonsCoinstallCache:
# Clear this database before we do anything with it
db.flushdb()
self._update_rank_data(db)
self._update_coinstall_data(db)
self._update_locale_data(db)
self._update_collab_data(db)

Просмотреть файл

@ -8,22 +8,21 @@ Test cases for the TAAR CollaborativeRecommender
import numpy
from moto import mock_s3
import boto3
from taar.recommenders.collaborative_recommender import (
TAAR_ITEM_MATRIX_BUCKET,
TAAR_ITEM_MATRIX_KEY,
TAAR_ADDON_MAPPING_BUCKET,
TAAR_ADDON_MAPPING_KEY,
)
import fakeredis
import mock
import contextlib
from taar.recommenders.redis_cache import AddonsCoinstallCache
from taar.recommenders.collaborative_recommender import CollaborativeRecommender
from taar.recommenders.collaborative_recommender import positive_hash
import json
from markus import TIMING
from markus.testing import MetricsMock
from .test_localerecommender import noop_taarlite_dataload
from .noop_fixtures import noop_taarlocale_dataload
"""
We need to generate a synthetic list of addons and relative weights
@ -33,29 +32,51 @@ the Java hash function.
"""
def install_none_mock_data(ctx):
@contextlib.contextmanager
def mock_install_none_mock_data(ctx):
"""
Overload the 'real' addon model and mapping URLs responses so that
we always get 404 errors.
"""
conn = boto3.resource("s3", region_name="us-west-2")
with contextlib.ExitStack() as stack:
stack.enter_context(
mock.patch.object(
AddonsCoinstallCache,
"_fetch_collaborative_item_matrix",
return_value="",
)
)
stack.enter_context(
mock.patch.object(
AddonsCoinstallCache,
"_fetch_collaborative_mapping_data",
return_value="",
)
)
conn.create_bucket(Bucket=TAAR_ITEM_MATRIX_BUCKET)
conn.Object(TAAR_ITEM_MATRIX_BUCKET, TAAR_ITEM_MATRIX_KEY).put(Body="")
stack = noop_taarlocale_dataload(stack)
stack = noop_taarlite_dataload(stack)
# Don't reuse connections with moto. badness happens
conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=TAAR_ADDON_MAPPING_BUCKET)
conn.Object(TAAR_ADDON_MAPPING_BUCKET, TAAR_ADDON_MAPPING_KEY).put(Body="")
return ctx
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
AddonsCoinstallCache,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
1: fakeredis.FakeStrictRedis(db=1),
2: fakeredis.FakeStrictRedis(db=2),
},
)
)
# Initialize redis
AddonsCoinstallCache(ctx).safe_load_data()
yield stack
def install_mock_data(ctx):
"""
Overload the 'real' addon model and mapping URLs responses so that
we always the fixture data at the top of this test module.
"""
@contextlib.contextmanager
def mock_install_mock_data(ctx):
addon_space = [
{"id": "addon1.id", "name": "addon1.name", "isWebextension": True},
{"id": "addon2.id", "name": "addon2.name", "isWebextension": True},
@ -66,7 +87,10 @@ def install_mock_data(ctx):
fake_addon_matrix = []
for i, addon in enumerate(addon_space):
row = {"id": positive_hash(addon["id"]), "features": [0, 0.2, 0.0, 0.1, 0.15]}
row = {
"id": positive_hash(addon["id"]),
"features": [0, 0.2, 0.0, 0.1, 0.15],
}
row["features"][i] = 1.0
fake_addon_matrix.append(row)
@ -75,74 +99,124 @@ def install_mock_data(ctx):
java_hash = positive_hash(addon["id"])
fake_mapping[str(java_hash)] = addon
conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=TAAR_ITEM_MATRIX_BUCKET)
conn.Object(TAAR_ITEM_MATRIX_BUCKET, TAAR_ITEM_MATRIX_KEY).put(
Body=json.dumps(fake_addon_matrix)
)
with contextlib.ExitStack() as stack:
stack.enter_context(
mock.patch.object(
AddonsCoinstallCache,
"_fetch_collaborative_item_matrix",
return_value=fake_addon_matrix,
)
)
stack.enter_context(
mock.patch.object(
AddonsCoinstallCache,
"_fetch_collaborative_mapping_data",
return_value=fake_mapping,
)
)
conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=TAAR_ADDON_MAPPING_BUCKET)
conn.Object(TAAR_ADDON_MAPPING_BUCKET, TAAR_ADDON_MAPPING_KEY).put(
Body=json.dumps(fake_mapping)
)
stack = noop_taarlocale_dataload(stack)
stack = noop_taarlite_dataload(stack)
return ctx
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
AddonsCoinstallCache,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
1: fakeredis.FakeStrictRedis(db=1),
2: fakeredis.FakeStrictRedis(db=2),
},
)
)
# Initialize redis
AddonsCoinstallCache(ctx).safe_load_data()
yield stack
@mock_s3
def test_cant_recommend(test_ctx):
ctx = install_mock_data(test_ctx)
r = CollaborativeRecommender(ctx)
with mock_install_mock_data(test_ctx):
r = CollaborativeRecommender(test_ctx)
# Test that we can't recommend if we have not enough client info.
assert not r.can_recommend({})
assert not r.can_recommend({"installed_addons": []})
# Test that we can't recommend if we have not enough client info.
assert not r.can_recommend({})
assert not r.can_recommend({"installed_addons": []})
@mock_s3
def test_can_recommend(test_ctx):
ctx = install_mock_data(test_ctx)
r = CollaborativeRecommender(ctx)
with mock_install_mock_data(test_ctx):
r = CollaborativeRecommender(test_ctx)
# For some reason, moto doesn't like to play nice with this call
# Check that we can recommend if we the user has at least an addon.
assert r.can_recommend(
{"installed_addons": ["uBlock0@raymondhill.net"], "client_id": "test-client"}
)
# For some reason, moto doesn't like to play nice with this call
# Check that we can recommend if we the user has at least an addon.
assert r.can_recommend(
{
"installed_addons": ["uBlock0@raymondhill.net"],
"client_id": "test-client",
}
)
@mock_s3
def test_can_recommend_no_model(test_ctx):
ctx = install_none_mock_data(test_ctx)
r = CollaborativeRecommender(ctx)
with mock_install_none_mock_data(test_ctx):
r = CollaborativeRecommender(test_ctx)
# We should never be able to recommend if something went wrong with the model.
assert not r.can_recommend({})
assert not r.can_recommend({"installed_addons": []})
assert not r.can_recommend({"installed_addons": ["uBlock0@raymondhill.net"]})
# We should never be able to recommend if something went wrong with the model.
assert not r.can_recommend({})
assert not r.can_recommend({"installed_addons": []})
assert not r.can_recommend({"installed_addons": ["uBlock0@raymondhill.net"]})
@mock_s3
def test_empty_recommendations(test_ctx):
# Tests that the empty recommender always recommends an empty list
# of addons if we have no addons
ctx = install_none_mock_data(test_ctx)
r = CollaborativeRecommender(ctx)
assert not r.can_recommend({})
with mock_install_none_mock_data(test_ctx):
r = CollaborativeRecommender(test_ctx)
assert not r.can_recommend({})
# Note that calling recommend() if can_recommend has failed is not
# defined.
# Note that calling recommend() if can_recommend has failed is not
# defined.
@mock_s3
def test_best_recommendation(test_ctx):
with MetricsMock() as mm:
# Make sure the structure of the recommendations is correct and that we
# recommended the the right addon.
ctx = install_mock_data(test_ctx)
r = CollaborativeRecommender(ctx)
with mock_install_mock_data(test_ctx):
r = CollaborativeRecommender(test_ctx)
# An non-empty set of addons should give a list of recommendations
fixture_client_data = {
"installed_addons": ["addon4.id"],
"client_id": "test_client",
}
assert r.can_recommend(fixture_client_data)
recommendations = r.recommend(fixture_client_data, 1)
assert isinstance(recommendations, list)
assert len(recommendations) == 1
# Verify that addon2 - the most heavy weighted addon was
# recommended
result = recommendations[0]
assert type(result) is tuple
assert len(result) == 2
assert result[0] == "addon2.id"
assert type(result[1]) is numpy.float64
assert numpy.isclose(result[1], numpy.float64("0.3225"))
assert mm.has_record(TIMING, stat="taar.collaborative_recommend")
def test_recommendation_weights(test_ctx):
"""
Weights should be ordered greatest to lowest
"""
with mock_install_mock_data(test_ctx):
r = CollaborativeRecommender(test_ctx)
# An non-empty set of addons should give a list of recommendations
fixture_client_data = {
@ -150,10 +224,9 @@ def test_best_recommendation(test_ctx):
"client_id": "test_client",
}
assert r.can_recommend(fixture_client_data)
recommendations = r.recommend(fixture_client_data, 1)
recommendations = r.recommend(fixture_client_data, 2)
assert isinstance(recommendations, list)
assert len(recommendations) == 1
assert len(recommendations) == 2
# Verify that addon2 - the most heavy weighted addon was
# recommended
@ -164,43 +237,11 @@ def test_best_recommendation(test_ctx):
assert type(result[1]) is numpy.float64
assert numpy.isclose(result[1], numpy.float64("0.3225"))
assert mm.has_record(TIMING, stat="taar.item_matrix")
assert mm.has_record(TIMING, stat="taar.addon_mapping")
assert mm.has_record(TIMING, stat="taar.collaborative_recommend")
@mock_s3
def test_recommendation_weights(test_ctx):
"""
Weights should be ordered greatest to lowest
"""
ctx = install_mock_data(test_ctx)
r = CollaborativeRecommender(ctx)
# An non-empty set of addons should give a list of recommendations
fixture_client_data = {
"installed_addons": ["addon4.id"],
"client_id": "test_client",
}
assert r.can_recommend(fixture_client_data)
recommendations = r.recommend(fixture_client_data, 2)
assert isinstance(recommendations, list)
assert len(recommendations) == 2
# Verify that addon2 - the most heavy weighted addon was
# recommended
result = recommendations[0]
assert type(result) is tuple
assert len(result) == 2
assert result[0] == "addon2.id"
assert type(result[1]) is numpy.float64
assert numpy.isclose(result[1], numpy.float64("0.3225"))
# Verify that addon2 - the most heavy weighted addon was
# recommended
result = recommendations[1]
assert type(result) is tuple
assert len(result) == 2
assert result[0] == "addon5.id"
assert type(result[1]) is numpy.float64
assert numpy.isclose(result[1], numpy.float64("0.29"))
# Verify that addon2 - the most heavy weighted addon was
# recommended
result = recommendations[1]
assert type(result) is tuple
assert len(result) == 2
assert result[0] == "addon5.id"
assert type(result[1]) is numpy.float64
assert numpy.isclose(result[1], numpy.float64("0.29"))

Просмотреть файл

@ -5,6 +5,8 @@ import pytest
import mock
import contextlib
from .noop_fixtures import noop_taarlocale_dataload, noop_taarcollab_dataload
from taar.recommenders.guid_based_recommender import GuidBasedRecommender
from taar.recommenders.redis_cache import AddonsCoinstallCache
@ -81,16 +83,6 @@ RESULTS = {
}
def noop_taarlocale_dataload(stack):
# no-op the taarlite rankdata
stack.enter_context(
mock.patch.object(
AddonsCoinstallCache, "_update_locale_data", return_value=None
)
)
return stack
@contextlib.contextmanager
def mock_coinstall_ranking_context(ctx, mock_coinstall, mock_ranking):
@ -109,6 +101,7 @@ def mock_coinstall_ranking_context(ctx, mock_coinstall, mock_ranking):
)
stack = noop_taarlocale_dataload(stack)
stack = noop_taarcollab_dataload(stack)
# Patch fakeredis in
stack.enter_context(

Просмотреть файл

@ -8,6 +8,7 @@ import mock
import contextlib
import fakeredis
from taar.recommenders.redis_cache import AddonsCoinstallCache
from .noop_fixtures import noop_taarcollab_dataload, noop_taarlite_dataload
import json
@ -42,20 +43,6 @@ def install_mock_data(ctx):
return ctx
def noop_taarlite_dataload(stack):
# no-op the taarlite rankdata
stack.enter_context(
mock.patch.object(AddonsCoinstallCache, "_update_rank_data", return_value=None)
)
# no-op the taarlite guidguid data
stack.enter_context(
mock.patch.object(
AddonsCoinstallCache, "_update_coinstall_data", return_value=None,
)
)
return stack
@contextlib.contextmanager
def mock_locale_data(ctx):
with contextlib.ExitStack() as stack:
@ -68,6 +55,7 @@ def mock_locale_data(ctx):
)
stack = noop_taarlite_dataload(stack)
stack = noop_taarcollab_dataload(stack)
# Patch fakeredis in
stack.enter_context(