Added a force_expiry() method to the JSON loader

This commit is contained in:
Victor Ng 2019-02-20 13:04:32 -05:00
Родитель bbb7f8b8ef
Коммит c1df5b1efb
2 изменённых файлов: 121 добавлений и 20 удалений

Просмотреть файл

@ -10,7 +10,7 @@ import time
class LazyJSONLoader:
def __init__(self, ctx, s3_bucket, s3_key, ttl=14400):
self._ctx = ctx
self.logger = self._ctx[IMozLogging].get_logger('taar')
self.logger = self._ctx[IMozLogging].get_logger("taar")
self._clock = self._ctx[IClock]
self._s3_bucket = s3_bucket
@ -21,13 +21,22 @@ class LazyJSONLoader:
self._key_str = "{}|{}".format(self._s3_bucket, self._s3_key)
self._cached_copy = None
msg = "Cache expiry of {} is set to TTL of {} seconds".format(self._key_str, self._ttl)
msg = "Cache expiry of {} is set to TTL of {} seconds".format(
self._key_str, self._ttl
)
self.logger.info(msg)
self._lock = threading.RLock()
self.logger.info("{} loader is initialized".format(self._key_str))
def force_expiry(self):
msg = "Existing model for {} reset to 0. Model was:".format(
self._key_str, str(self._cached_copy)
)
self.logger.info(msg)
self._expiry_time = 0
def has_expired(self):
return self._clock.time() > self._expiry_time
@ -63,26 +72,22 @@ class LazyJSONLoader:
raw_bytes = None
try:
# We need to force a data reload from S3
config = Config(connect_timeout=10, retries={'max_attempts': 3})
s3 = boto3.resource('s3', config=config)
config = Config(connect_timeout=10, retries={"max_attempts": 3})
s3 = boto3.resource("s3", config=config)
start_load = time.time()
raw_bytes = (
s3
.Object(self._s3_bucket, self._s3_key)
.get()['Body']
.read()
s3.Object(self._s3_bucket, self._s3_key).get()["Body"].read()
)
end_load = time.time()
load_time = (end_load - start_load)
msg = "Loaded JSON from S3: {}. Byte count: {:d}. Time to Load: {:0.3f}"
load_time = end_load - start_load
raw_data = raw_bytes.decode("utf-8")
msg = "Loaded S3: {}. Byte count: {:d}. Time to Load: {:0.3f}"
msg_params = self._key_str, len(raw_bytes), load_time
self.logger.info(msg.format(*msg_params))
raw_data = (
raw_bytes.decode('utf-8')
)
# It is possible to have corrupted files in S3, so
# protect against that.
try:
@ -97,9 +102,10 @@ class LazyJSONLoader:
# requests.
self._expiry_time = 0
self.logger.error("Cannot parse JSON resource from S3", extra={
"bucket": self._s3_bucket,
"key": self._s3_key})
self.logger.error(
"Cannot parse JSON resource from S3",
extra={"bucket": self._s3_bucket, "key": self._s3_key},
)
return self._cached_copy
except Exception:
@ -109,7 +115,8 @@ class LazyJSONLoader:
# requests.
self._expiry_time = 0
self.logger.exception("Failed to download from S3", extra={
"bucket": self._s3_bucket,
"key": self._s3_key})
self.logger.exception(
"Failed to download from S3",
extra={"bucket": self._s3_bucket, "key": self._s3_key},
)
return self._cached_copy

94
tests/test_lazys3.py Normal file
Просмотреть файл

@ -0,0 +1,94 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import json
from taar.recommenders.lazys3 import LazyJSONLoader
import boto3
from moto import mock_s3
from taar.recommenders.s3config import (
TAAR_SIMILARITY_BUCKET,
TAAR_SIMILARITY_DONOR_KEY,
)
def install_categorical_data(ctx):
ctx = ctx.child()
conn = boto3.resource("s3", region_name="us-west-2")
try:
conn.create_bucket(Bucket=TAAR_SIMILARITY_BUCKET)
except Exception:
pass
conn.Object(TAAR_SIMILARITY_BUCKET, TAAR_SIMILARITY_DONOR_KEY).put(
Body=json.dumps({"test": "donor_key"})
)
ctx["similarity_donors_pool"] = LazyJSONLoader(
ctx, TAAR_SIMILARITY_BUCKET, TAAR_SIMILARITY_DONOR_KEY
)
return ctx
@mock_s3
def test_does_it_load(test_ctx):
ctx = install_categorical_data(test_ctx)
jdata, status = ctx["similarity_donors_pool"].get()
assert jdata["test"] == "donor_key"
check_jdata_status(jdata, status)
@mock_s3
def test_cached_load(test_ctx):
ctx = install_categorical_data(test_ctx)
loader = ctx["similarity_donors_pool"]
jdata, status = loader.get()
check_jdata_status(jdata, status)
jdata, status = loader.get()
assert not status
@mock_s3
def test_reload_on_expiry(test_ctx):
ctx = install_categorical_data(test_ctx)
loader = ctx["similarity_donors_pool"]
jdata, status = loader.get()
check_jdata_status(jdata, status)
jdata, status = loader.get()
assert not status
# Force expirty time to be 10 seconds ago
loader._expiry_time = loader._clock.time() - 10
jdata, status = loader.get()
check_jdata_status(jdata, status)
@mock_s3
def test_force_expiry(test_ctx):
ctx = install_categorical_data(test_ctx)
loader = ctx["similarity_donors_pool"]
jdata, status = loader.get()
check_jdata_status(jdata, status)
jdata, status = loader.get()
assert not status
# Force expirty time to be 10 seconds ago
loader.force_expiry()
jdata, status = loader.get()
check_jdata_status(jdata, status)
def check_jdata_status(jdata, status):
assert jdata == {'test': 'donor_key'}
assert status