diff --git a/taar/recommenders/lazys3.py b/taar/recommenders/lazys3.py index 69fb356..aec01f5 100644 --- a/taar/recommenders/lazys3.py +++ b/taar/recommenders/lazys3.py @@ -10,7 +10,7 @@ import time class LazyJSONLoader: def __init__(self, ctx, s3_bucket, s3_key, ttl=14400): self._ctx = ctx - self.logger = self._ctx[IMozLogging].get_logger('taar') + self.logger = self._ctx[IMozLogging].get_logger("taar") self._clock = self._ctx[IClock] self._s3_bucket = s3_bucket @@ -21,13 +21,22 @@ class LazyJSONLoader: self._key_str = "{}|{}".format(self._s3_bucket, self._s3_key) self._cached_copy = None - msg = "Cache expiry of {} is set to TTL of {} seconds".format(self._key_str, self._ttl) + msg = "Cache expiry of {} is set to TTL of {} seconds".format( + self._key_str, self._ttl + ) self.logger.info(msg) self._lock = threading.RLock() self.logger.info("{} loader is initialized".format(self._key_str)) + def force_expiry(self): + msg = "Existing model for {} reset to 0. Model was:".format( + self._key_str, str(self._cached_copy) + ) + self.logger.info(msg) + self._expiry_time = 0 + def has_expired(self): return self._clock.time() > self._expiry_time @@ -63,26 +72,22 @@ class LazyJSONLoader: raw_bytes = None try: # We need to force a data reload from S3 - config = Config(connect_timeout=10, retries={'max_attempts': 3}) - s3 = boto3.resource('s3', config=config) + config = Config(connect_timeout=10, retries={"max_attempts": 3}) + s3 = boto3.resource("s3", config=config) start_load = time.time() raw_bytes = ( - s3 - .Object(self._s3_bucket, self._s3_key) - .get()['Body'] - .read() + s3.Object(self._s3_bucket, self._s3_key).get()["Body"].read() ) end_load = time.time() - load_time = (end_load - start_load) - msg = "Loaded JSON from S3: {}. Byte count: {:d}. Time to Load: {:0.3f}" + load_time = end_load - start_load + + raw_data = raw_bytes.decode("utf-8") + + msg = "Loaded S3: {}. Byte count: {:d}. Time to Load: {:0.3f}" msg_params = self._key_str, len(raw_bytes), load_time self.logger.info(msg.format(*msg_params)) - raw_data = ( - raw_bytes.decode('utf-8') - ) - # It is possible to have corrupted files in S3, so # protect against that. try: @@ -97,9 +102,10 @@ class LazyJSONLoader: # requests. self._expiry_time = 0 - self.logger.error("Cannot parse JSON resource from S3", extra={ - "bucket": self._s3_bucket, - "key": self._s3_key}) + self.logger.error( + "Cannot parse JSON resource from S3", + extra={"bucket": self._s3_bucket, "key": self._s3_key}, + ) return self._cached_copy except Exception: @@ -109,7 +115,8 @@ class LazyJSONLoader: # requests. self._expiry_time = 0 - self.logger.exception("Failed to download from S3", extra={ - "bucket": self._s3_bucket, - "key": self._s3_key}) + self.logger.exception( + "Failed to download from S3", + extra={"bucket": self._s3_bucket, "key": self._s3_key}, + ) return self._cached_copy diff --git a/tests/test_lazys3.py b/tests/test_lazys3.py new file mode 100644 index 0000000..2a9d8af --- /dev/null +++ b/tests/test_lazys3.py @@ -0,0 +1,94 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +import json + +from taar.recommenders.lazys3 import LazyJSONLoader + +import boto3 +from moto import mock_s3 + + +from taar.recommenders.s3config import ( + TAAR_SIMILARITY_BUCKET, + TAAR_SIMILARITY_DONOR_KEY, +) + + +def install_categorical_data(ctx): + ctx = ctx.child() + conn = boto3.resource("s3", region_name="us-west-2") + + try: + conn.create_bucket(Bucket=TAAR_SIMILARITY_BUCKET) + except Exception: + pass + + conn.Object(TAAR_SIMILARITY_BUCKET, TAAR_SIMILARITY_DONOR_KEY).put( + Body=json.dumps({"test": "donor_key"}) + ) + + ctx["similarity_donors_pool"] = LazyJSONLoader( + ctx, TAAR_SIMILARITY_BUCKET, TAAR_SIMILARITY_DONOR_KEY + ) + + return ctx + + +@mock_s3 +def test_does_it_load(test_ctx): + ctx = install_categorical_data(test_ctx) + + jdata, status = ctx["similarity_donors_pool"].get() + assert jdata["test"] == "donor_key" + check_jdata_status(jdata, status) + + +@mock_s3 +def test_cached_load(test_ctx): + ctx = install_categorical_data(test_ctx) + loader = ctx["similarity_donors_pool"] + jdata, status = loader.get() + check_jdata_status(jdata, status) + jdata, status = loader.get() + assert not status + + +@mock_s3 +def test_reload_on_expiry(test_ctx): + ctx = install_categorical_data(test_ctx) + loader = ctx["similarity_donors_pool"] + + jdata, status = loader.get() + check_jdata_status(jdata, status) + jdata, status = loader.get() + assert not status + + # Force expirty time to be 10 seconds ago + loader._expiry_time = loader._clock.time() - 10 + + jdata, status = loader.get() + check_jdata_status(jdata, status) + + +@mock_s3 +def test_force_expiry(test_ctx): + ctx = install_categorical_data(test_ctx) + loader = ctx["similarity_donors_pool"] + + jdata, status = loader.get() + check_jdata_status(jdata, status) + jdata, status = loader.get() + assert not status + + # Force expirty time to be 10 seconds ago + loader.force_expiry() + + jdata, status = loader.get() + check_jdata_status(jdata, status) + + +def check_jdata_status(jdata, status): + assert jdata == {'test': 'donor_key'} + assert status