TAARlite returns empty lists if the cache is empty

This commit is contained in:
Victor Ng 2020-08-27 16:49:19 -04:00
Родитель d2e711aad5
Коммит 0047fbbb4e
4 изменённых файлов: 163 добавлений и 190 удалений

Просмотреть файл

@ -16,16 +16,14 @@ from taar.settings import (
TAARLITE_GUID_RANKING_KEY,
TAARLITE_TTL,
TAARLITE_TRUNCATE,
TAARLITE_UPDATE_POLL,
TAARLITE_MUTEX_TTL,
)
import time
from jsoncache.loader import s3_json_loader
ACTIVE_DB = "active_db"
UPDATE_CHECK = "update_id_check"
UPDATE_CHECK = "update_mutex|"
COINSTALL_PREFIX = "coinstall|"
@ -42,9 +40,6 @@ NORMDATA_ROWCOUNT_PREFIX = "normdata_rowcount_prefix|"
NORMDATA_GUID_ROW_NORM_PREFIX = "normdata_guid_row_norm_prefix|"
NEXT_UPDATE_MUTEX = "next_update_mutex|"
NEXT_UPDATE_TIME = "next_update_time|"
class PrefixStripper:
def __init__(self, prefix, iterator, cast_to_str=False):
@ -80,63 +75,27 @@ class AddonsCoinstallCache:
self._r1 = rcon[1]
self._r2 = rcon[2]
if self._db() is None:
self.safe_load_data()
def reset(self):
# Clear out the r0 bookkeeping to reset the database
return self._r0.flushdb()
self.wait_for_data()
self.start_update_thread()
def start_update_thread(self):
self._update_thread = threading.Thread(
target=self._update_data_target, daemon=True
)
self._update_thread.start()
def _update_data_target(self):
while True:
self.logger.info(f"TAARLite update is alive. ident={self._ident}")
try:
self.logger.debug(f"Trying to acquire lock {self._ident}")
self._r0.set(
NEXT_UPDATE_MUTEX, self._ident, nx=True, ex=TAARLITE_MUTEX_TTL,
)
tmp = self._r0.get(NEXT_UPDATE_MUTEX)
if tmp is None:
# Someone else acquired the lock (and released it)
return
if tmp.decode("utf8") != self._ident:
# Someone else acquired the lock
return
self.logger.debug(f"Acquired lock {self._ident}")
try:
next_update = self._r0.get(NEXT_UPDATE_TIME)
if next_update is None:
next_update = time.time() + TAARLITE_TTL
self._r0.set(NEXT_UPDATE_TIME, next_update)
next_update = float(self._r0.get(NEXT_UPDATE_TIME))
self.logger.info(
f"Next TAARlite refresh is in {next_update - time.time()} seconds"
)
if next_update <= time.time():
# We're past the expiry time
self.safe_load_data()
finally:
self._r0.delete(NEXT_UPDATE_MUTEX)
self.logger.debug(f"Released lock {self._ident}")
except Exception:
self.logger.exception("Error while updating GUID GUID cache")
time.sleep(TAARLITE_UPDATE_POLL)
@property
def _ident(self):
""" pid/thread identity """
return f"{os.getpid()}_{threading.get_ident()}"
def info(self):
"""
Dump bookkeeping metadata to logs
"""
meta = {}
for key in self._r0.scan_iter():
meta[key.decode("utf8")] = self._r0.get(key).decode("utf8")
if len(meta) == 0:
self.logger.info("Bookkeeping data for TAARLite cache was empty")
else:
self.logger.info("TAARLite cache info", extra=meta)
def init_redis_connections(self):
"""
Bind connections to redis databases. This sits in its own
method to enable mocking for tests
"""
return {
0: redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0),
1: redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=1),
@ -164,6 +123,9 @@ class AddonsCoinstallCache:
# thread can update redis
update_ident = self._r0.get(UPDATE_CHECK).decode("utf8")
if update_ident != self._ident:
self.logger.info(
"Cache update lock has already been acquired by another process"
)
return
# We're past the thread barrier - load the data and clear the
@ -174,57 +136,9 @@ class AddonsCoinstallCache:
self._r0.delete(UPDATE_CHECK)
self.logger.info("UPDATE_CHECK field is cleared")
def _load_data(self):
active_db = self._r0.get(ACTIVE_DB)
if active_db is not None:
active_db = int(active_db)
if active_db == 1:
next_active_db = 2
else:
next_active_db = 1
else:
next_active_db = 1
self._copy_data(next_active_db)
self.logger.info("Completed precomputing normalized data")
# TODO: should this autoexpire to help indicate that no fresh
# data has loaded? Maybe N * update TTL time?
self._r0.set(ACTIVE_DB, next_active_db)
self.logger.info(f"Active DB is set to {next_active_db}")
def _copy_data(self, next_active_db):
if next_active_db == 1:
db = self._r1
else:
db = self._r2
# Clear this database before we do anything with it
db.flushdb()
self._update_rank_data(db)
self._update_coinstall_data(db)
def fetch_ranking_data(self):
return s3_json_loader(TAARLITE_GUID_COINSTALL_BUCKET, TAARLITE_GUID_RANKING_KEY)
def _update_rank_data(self, db):
data = self.fetch_ranking_data()
items = data.items()
len_items = len(items)
for i, (guid, count) in enumerate(items):
db.set(RANKING_PREFIX + guid, json.dumps(count))
if i % 1000 == 0:
self.logger.info(f"Loaded {i+1} of {len_items} GUID ranking into redis")
min_installs = np.mean(list(data.values())) * 0.05
db.set(MIN_INSTALLS_PREFIX, min_installs)
self.logger.info(f"Updated MIN_INSTALLS: {min_installs}")
def guid_maps_count_map(self, guid, default=None):
tmp = self._db().get(NORMDATA_COUNT_MAP_PREFIX + guid)
if tmp:
@ -258,6 +172,78 @@ class AddonsCoinstallCache:
TAARLITE_GUID_COINSTALL_BUCKET, TAARLITE_GUID_COINSTALL_KEY
)
def get_filtered_coinstall(self, guid, default=None):
tmp = self._db().get(FILTERED_COINSTALL_PREFIX + guid)
if tmp:
raw_dict = json.loads(tmp.decode("utf8"))
# This truncates the size of the coinstall list for
# performance reasons
return dict(
sorted(raw_dict.items(), key=lambda x: x[1], reverse=True)[
:TAARLITE_TRUNCATE
]
)
return default
def get_rankings(self, guid, default=None):
"""
Return the rankings
"""
tmp = self._db().get(RANKING_PREFIX + guid)
if tmp:
return json.loads(tmp.decode("utf8"))
return default
def has_coinstalls_for(self, guid):
return self._db().get(COINSTALL_PREFIX + guid) is not None
def get_coinstalls(self, guid, default=None):
"""
Return a map of GUID:install count that represents the
coinstallation map for a particular addon GUID
"""
tmp = self._db().get(COINSTALL_PREFIX + guid)
if tmp:
return json.loads(tmp.decode("utf8"))
return default
def key_iter_ranking(self):
return PrefixStripper(
RANKING_PREFIX, self._db().scan_iter(match=RANKING_PREFIX + "*")
)
def key_iter_coinstall(self):
return PrefixStripper(
COINSTALL_PREFIX, self._db().scan_iter(match=COINSTALL_PREFIX + "*")
)
def is_active(self):
"""
return True if data is loaded
"""
# Any value in ACTIVE_DB indicates that data is live
return self._r0.get(ACTIVE_DB) is not None
# Private methods below
def _db(self):
"""
This dereferences the ACTIVE_DB pointer to get the current
active redis instance
"""
active_db = self._r0.get(ACTIVE_DB)
if active_db is not None:
db = int(active_db.decode("utf8"))
if db == 1:
return self._r1
elif db == 2:
return self._r2
@property
def _ident(self):
""" pid/thread identity """
return f"{os.getpid()}_{threading.get_ident()}"
def _update_coinstall_data(self, db):
data = self.fetch_coinstall_data()
@ -314,75 +300,50 @@ class AddonsCoinstallCache:
)
self.logger.info("finished saving guidmaps to redis")
def get_filtered_coinstall(self, guid, default=None):
tmp = self._db().get(FILTERED_COINSTALL_PREFIX + guid)
if tmp:
raw_dict = json.loads(tmp.decode("utf8"))
# This truncates the size of the coinstall list for
# performance reasons
return dict(
sorted(raw_dict.items(), key=lambda x: x[1], reverse=True)[
:TAARLITE_TRUNCATE
]
)
return default
def _update_rank_data(self, db):
def get_rankings(self, guid, default=None):
"""
Return the rankings
"""
tmp = self._db().get(RANKING_PREFIX + guid)
if tmp:
return json.loads(tmp.decode("utf8"))
return default
data = self.fetch_ranking_data()
def has_coinstalls_for(self, guid):
return self._db().get(COINSTALL_PREFIX + guid) is not None
items = data.items()
len_items = len(items)
def get_coinstalls(self, guid, default=None):
"""
Return a map of GUID:install count that represents the
coinstallation map for a particular addon GUID
"""
tmp = self._db().get(COINSTALL_PREFIX + guid)
if tmp:
return json.loads(tmp.decode("utf8"))
return default
for i, (guid, count) in enumerate(items):
db.set(RANKING_PREFIX + guid, json.dumps(count))
def key_iter_ranking(self):
return PrefixStripper(
RANKING_PREFIX, self._db().scan_iter(match=RANKING_PREFIX + "*")
)
if i % 1000 == 0:
self.logger.info(f"Loaded {i+1} of {len_items} GUID ranking into redis")
def key_iter_coinstall(self):
return PrefixStripper(
COINSTALL_PREFIX, self._db().scan_iter(match=COINSTALL_PREFIX + "*")
)
min_installs = np.mean(list(data.values())) * 0.05
db.set(MIN_INSTALLS_PREFIX, min_installs)
self.logger.info(f"Updated MIN_INSTALLS: {min_installs}")
def is_active(self):
"""
return True if data is loaded
"""
# Any value in ACTIVE_DB indicates that data is live
return self._r0.get(ACTIVE_DB) is not None
def wait_for_data(self):
while True:
if self.is_active():
break
self.logger.info("waiting for data. spinlock active")
time.sleep(1)
self.logger.info("finished waiting for data")
def _db(self):
"""
This dereferences the ACTIVE_DB pointer to get the current
active redis instance
"""
def _load_data(self):
active_db = self._r0.get(ACTIVE_DB)
if active_db is not None:
db = int(active_db.decode("utf8"))
if db == 1:
return self._r1
elif db == 2:
return self._r2
active_db = int(active_db.decode("utf8"))
if active_db == 1:
next_active_db = 2
else:
next_active_db = 1
else:
next_active_db = 1
self._copy_data(next_active_db)
self.logger.info("Completed precomputing normalized data")
# TODO: should this autoexpire to help indicate that no fresh
# data has loaded? Maybe N * update TTL time?
self._r0.set(ACTIVE_DB, next_active_db)
self.logger.info(f"Active DB is set to {next_active_db}")
def _copy_data(self, next_active_db):
if next_active_db == 1:
db = self._r1
else:
db = self._r2
# Clear this database before we do anything with it
db.flushdb()
self._update_rank_data(db)
self._update_coinstall_data(db)

Просмотреть файл

@ -69,9 +69,6 @@ TAARLITE_TTL = config("TAARLITE_TTL", 60 * 60 * 4, cast=int)
TAARLITE_MUTEX_TTL = config("TAARLITE_MUTEX_TTL", 60 * 60, cast=int)
# Poll for TTL expiry every 60 seconds
TAARLITE_UPDATE_POLL = config("TAARLITE_UPDATE_POLL", 60, cast=int)
# Set a default TAARLite mutex TTL of 1 hour to fully populate the
# redis cache
TAARLITE_MUTEX_TTL = config("TAARLITE_MUTEX_TTL", 60 * 60, cast=int)

Просмотреть файл

@ -82,7 +82,8 @@ RESULTS = {
@contextlib.contextmanager
def mock_coinstall_ranking_context(mock_coinstall, mock_ranking):
def mock_coinstall_ranking_context(ctx, mock_coinstall, mock_ranking):
with contextlib.ExitStack() as stack:
stack.enter_context(
mock.patch.object(
@ -103,18 +104,22 @@ def mock_coinstall_ranking_context(mock_coinstall, mock_ranking):
AddonsCoinstallCache,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(),
1: fakeredis.FakeStrictRedis(),
2: fakeredis.FakeStrictRedis(),
0: fakeredis.FakeStrictRedis(db=0),
1: fakeredis.FakeStrictRedis(db=1),
2: fakeredis.FakeStrictRedis(db=2),
},
)
)
# Initialize redis
AddonsCoinstallCache(ctx).safe_load_data()
yield stack
def test_recommender_nonormal(test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
with mock_coinstall_ranking_context(TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
with mock_coinstall_ranking_context(
test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
with MetricsMock() as mm:
EXPECTED_RESULTS = RESULTS["default"]
recommender = GuidBasedRecommender(test_ctx)
@ -132,7 +137,9 @@ def test_recommender_nonormal(test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_R
def test_row_count_recommender(
test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
with mock_coinstall_ranking_context(TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
with mock_coinstall_ranking_context(
test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
EXPECTED_RESULTS = RESULTS["row_count"]
@ -147,7 +154,9 @@ def test_row_count_recommender(
def test_rownorm_sumrownorm(test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
with mock_coinstall_ranking_context(TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
with mock_coinstall_ranking_context(
test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
EXPECTED_RESULTS = RESULTS["rownorm_sum"]
recommender = GuidBasedRecommender(test_ctx)
guid = "guid-2"
@ -184,7 +193,9 @@ def test_rownorm_sumrownorm(test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RAN
def test_rowsum_recommender(test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
with mock_coinstall_ranking_context(TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
with mock_coinstall_ranking_context(
test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
EXPECTED_RESULTS = RESULTS["row_sum"]
recommender = GuidBasedRecommender(test_ctx)
@ -201,7 +212,9 @@ def test_rowsum_recommender(test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RAN
def test_guidception(test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
with mock_coinstall_ranking_context(TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
with mock_coinstall_ranking_context(
test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
EXPECTED_RESULTS = RESULTS["guidception"]
@ -216,7 +229,7 @@ def test_rownorm_sum_tiebreak(
test_ctx, TAARLITE_TIE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
with mock_coinstall_ranking_context(
TAARLITE_TIE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
test_ctx, TAARLITE_TIE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
EXPECTED_RESULTS = RESULTS["rownorm_sum_tiebreak"]
@ -234,7 +247,7 @@ def test_missing_rownorm_data_issue_31(
test_ctx, TAARLITE_TIE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
with mock_coinstall_ranking_context(
TAARLITE_TIE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
test_ctx, TAARLITE_TIE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
recommender = GuidBasedRecommender(test_ctx)
@ -259,7 +272,7 @@ def test_divide_by_zero_rownorm_data_issue_31(
test_ctx, TAARLITE_TIE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
with mock_coinstall_ranking_context(
TAARLITE_TIE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
test_ctx, TAARLITE_TIE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
recommender = GuidBasedRecommender(test_ctx)

Просмотреть файл

@ -325,12 +325,14 @@ def test_client_has_no_addon(client, profile_enabled_rm):
assert res.json["results"] is False
def test_taarlite(client, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING, test_ctx):
def test_taarlite(client, test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
"""
Check that the result size of taarlite is TAARLITE_MAX_RESULTS
"""
with mock_coinstall_ranking_context(TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING):
with mock_coinstall_ranking_context(
test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
url = url_for("taarlite_recommendations", guid="guid-1",)
res = client.get(url, follow_redirects=True)