GCS migration, packaging fixes (#188)

* Switch from S3 to GCS
* Refactoring of taar cache to provide independent context for Ensemble spark job
* Reduce external dependencies of recommenders used in Ensemble Spark job
* Settings refactoring
* Update docs
This commit is contained in:
Evgeny Pavlov 2021-02-05 15:23:11 -08:00 коммит произвёл GitHub
Родитель 47c797d9bd
Коммит 3bdfcaf428
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
39 изменённых файлов: 1145 добавлений и 1097 удалений

Просмотреть файл

@ -22,7 +22,7 @@ jobs:
- setup_remote_docker:
docker_layer_caching: true
- run: apt-get update; apt-get install make -y
- run: . /opt/conda/etc/profile.d/conda.sh && conda env create -n taar-37 --file enviroment.yml
- run: . /opt/conda/etc/profile.d/conda.sh && conda env create -n taar-37 --file environment.yml
- run: . /opt/conda/etc/profile.d/conda.sh && conda activate taar-37 && python setup.py install && make pytest
workflows:

Просмотреть файл

@ -14,39 +14,20 @@ RUN apt-get update && \
WORKDIR /app
# First copy requirements.txt so we can take advantage of docker
# First copy requirements so we can take advantage of docker
# caching.
COPY . /app
COPY ./environment.yml /app/environment.yml
RUN conda env update -n taar-37 -f environment.yml
RUN make setup_conda
COPY . /app
RUN python setup.py develop
RUN . /opt/conda/etc/profile.d/conda.sh && \
conda activate taar-37 && \
python setup.py install
conda activate taar-37 && python setup.py install
USER app
ENV TAAR_API_PLUGIN=taar.plugin
ENV TAAR_ITEM_MATRIX_BUCKET=telemetry-public-analysis-2
ENV TAAR_ITEM_MATRIX_KEY=telemetry-ml/addon_recommender/item_matrix.json
ENV TAAR_ADDON_MAPPING_BUCKET=telemetry-public-analysis-2
ENV TAAR_ADDON_MAPPING_KEY=telemetry-ml/addon_recommender/addon_mapping.json
ENV TAAR_ENSEMBLE_BUCKET=telemetry-parquet
ENV TAAR_ENSEMBLE_KEY=taar/ensemble/ensemble_weight.json
ENV TAAR_WHITELIST_BUCKET=telemetry-parquet
ENV TAAR_WHITELIST_KEY=telemetry-ml/addon_recommender/only_guids_top_200.json
ENV TAAR_LOCALE_BUCKET=telemetry-parquet
ENV TAAR_LOCALE_KEY=taar/locale/top10_dict.json
ENV TAAR_SIMILARITY_BUCKET=telemetry-parquet
ENV TAAR_SIMILARITY_DONOR_KEY=taar/similarity/donors.json
ENV TAAR_SIMILARITY_LRCURVES_KEY=taar/similarity/lr_curves.json
ENV TAAR_MAX_RESULTS=10
ENV AWS_SECRET_ACCESS_KEY=
ENV AWS_ACCESS_KEY_ID=
ENV BIGTABLE_PROJECT_ID=
ENV BIGTABLE_INSTANCE_ID=
ENV BIGTABLE_TABLE_ID=
# Using /bin/bash as the entrypoint works around some volume mount issues on Windows
# where volume-mounted files do not have execute bits set.
# https://github.com/docker/compose/issues/2301#issuecomment-154450785 has additional background.

Просмотреть файл

@ -6,9 +6,16 @@ all:
setup_conda:
# Install all dependencies and setup repo in dev mode
conda env update -n taar-37 -f enviroment.yml
conda env update -n taar-37 -f environment.yml
python setup.py develop
conda_update:
# Actualize env after .yml file was modified
conda env update -n taar-37 -f environment.yml --prune
conda_export:
conda env export > environment.yml
upload:
twine upload --repository-url https://upload.pypi.org/legacy/ dist/*
@ -21,44 +28,17 @@ build:
docker build . -t taar:latest
up:
docker run \
--rm \
--name=taar \
-v ~/.config:/app/.config \
-v ~/.aws:/app/.aws \
-v ~/.gcp_creds:/app/.gcp_creds \
-e WORKERS=1 \
-e THREADS=2 \
-e LOG_LEVEL=20 \
-e GOOGLE_APPLICATION_CREDENTIALS=/app/.gcp_creds/vng-taar-stage.json \
-e TAAR_API_PLUGIN=taar.plugin \
-e TAAR_ITEM_MATRIX_BUCKET=telemetry-public-analysis-2 \
-e TAAR_ITEM_MATRIX_KEY=telemetry-ml/addon_recommender/item_matrix.json \
-e TAAR_ADDON_MAPPING_BUCKET=telemetry-public-analysis-2 \
-e TAAR_ADDON_MAPPING_KEY=telemetry-ml/addon_recommender/addon_mapping.json \
-e TAAR_ENSEMBLE_BUCKET=telemetry-parquet \
-e TAAR_ENSEMBLE_KEY=taar/ensemble/ensemble_weight.json \
-e TAAR_WHITELIST_BUCKET=telemetry-parquet \
-e TAAR_WHITELIST_KEY=telemetry-ml/addon_recommender/only_guids_top_200.json \
-e TAAR_LOCALE_BUCKET=telemetry-parquet \
-e TAAR_LOCALE_KEY=taar/locale/top10_dict.json \
-e TAAR_SIMILARITY_BUCKET=telemetry-parquet \
-e TAAR_SIMILARITY_DONOR_KEY=taar/similarity/donors.json \
-e TAAR_SIMILARITY_LRCURVES_KEY=taar/similarity/lr_curves.json \
-e TAAR_MAX_RESULTS=10 \
-e TAARLITE_MAX_RESULTS=4 \
-e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
-e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \
-e BIGTABLE_PROJECT_ID=moz-fx-data-taar-nonprod-48b6 \
-e BIGTABLE_INSTANCE_ID=taar-stage-202006 \
-p 8000:8000 \
-it taar:latest
docker-compose up
test-container:
docker run -e CODECOV_TOKEN=${CODECOV_TOKEN} -it taar:latest test
run_local:
TAAR_API_PLUGIN=taar.plugin python taar/flask_app.py
. bin/test_env.sh && python taar/flask_app.py -H 0.0.0.0 -P 8001
run_package_test:
python setup.py develop
python bin/run_package_test.py
shell:
docker run -it taar:latest bash

Просмотреть файл

@ -13,9 +13,6 @@ Table of Contents
* [Build and run tests](#build-and-run-tests)
* [Pinning dependencies](#pinning-dependencies)
* [Instructions for releasing updates to production](#instructions-for-releasing-updates-to-production)
* [Dependencies](#dependencies)
* [AWS resources](#aws-resources)
* [AWS enviroment configuration](#aws-enviroment-configuration)
* [Collaborative Recommender](#collaborative-recommender)
* [Ensemble Recommender](#ensemble-recommender)
* [Locale Recommender](#locale-recommender)
@ -26,8 +23,8 @@ Table of Contents
* [Google Cloud BigTable](#google-cloud-bigtable)
* [Production Configuration Settings](#production-configuration-settings)
* [Deleting individual user data from all TAAR resources](#deleting-individual-user-data-from-all-taar-resources)
* [Airflow enviroment configuration](#airflow-enviroment-configuration)
* [Staging Enviroment](#staging-enviroment)
* [Airflow environment configuration](#airflow-environment-configuration)
* [Staging Environment](#staging-environment)
* [A note on cdist optimization\.](#a-note-on-cdist-optimization)
@ -86,10 +83,12 @@ container and run the test suite inside the container.
## Pinning dependencies
TAAR uses miniconda and a enviroment.yml file to manage versioning.
TAAR uses miniconda and a environment.yml file to manage versioning.
To update versions, edit the enviroment.yml with the new dependency
you need. If you are unfamiliar with using conda, see the [official
To update versions, edit the `environment.yml` with the new dependency
you need then run `make conda_update`.
If you are unfamiliar with using conda, see the [official
documentation](https://docs.conda.io/projects/conda/en/latest/user-guide/tasks/manage-environments.html)
for reference.
@ -104,7 +103,6 @@ create a new release has been split out into separate
### Google Cloud Storage resources
### TODO: put this into a table to be easier to read
The final TAAR models are stored in:
```gs://moz-fx-data-taar-pr-prod-e0f7-prod-models```
@ -115,63 +113,62 @@ variable `taar_etl_model_storage_bucket`
Temporary models that the Airflow ETL jobs require are stored in a
temporary bucket defined in the Airflow variable `taar_etl_storage_bucket`
### AWS resources
Recommendation engines load models from Amazon S3.
Recommendation engines load models from GCS.
The following table is a complete list of all resources per
recommendation engine.
Recommendation Engine | S3 Resource
Recommendation Engine | GCS Resource
--- | ---
RecommendationManager Whitelist | s3://telemetry-parquet/telemetry-ml/addon_recommender/top_200_whitelist.json
Similarity Recommender | s3://telemetry-parquet/taar/similarity/donors.json <br> s3://telemetry-parquet/taar/similarity/lr_curves.json
CollaborativeRecommender | s3://telemetry-parquet/telemetry-ml/addon_recommender/item_matrix.json <br> s3://telemetry-parquet/telemetry-ml/addon_recommender/addon_mapping.json
LocaleRecommender | s3://telemetry-parquet/taar/locale/top10_dict.json
EnsembleRecommender | s3://telemetry-parquet/taar/ensemble/ensemble_weight.json
RecommendationManager Whitelist | gs://moz-fx-data-taar-pr-prod-e0f7-prod-models/addon_recommender/only_guids_top_200.json.bz2
Similarity Recommender | gs://moz-fx-data-taar-pr-prod-e0f7-prod-models/taar/similarity/donors.json.bz2 <br> gs://moz-fx-data-taar-pr-prod-e0f7-prod-models/taar/similarity/lr_curves.json.bz2
CollaborativeRecommender | gs://moz-fx-data-taar-pr-prod-e0f7-prod-models/addon_recommender/item_matrix.json.bz2 <br> gs://moz-fx-data-taar-pr-prod-e0f7-prod-models/addon_recommender/addon_mapping.json.bz2
LocaleRecommender | gs://moz-fx-data-taar-pr-prod-e0f7-prod-models/taar/locale/top10_dict.json.bz2
EnsembleRecommender | gs://moz-fx-data-taar-pr-prod-e0f7-prod-models/taar/ensemble/ensemble_weight.json.bz2
TAAR lite | gs://moz-fx-data-taar-pr-prod-e0f7-prod-models/taar/lite/guid_install_ranking.json.bz2 <br/> gs://moz-fx-data-taar-pr-prod-e0f7-prod-models/taar/lite/guid_coinstallation.json.bz2
### AWS enviroment configuration
TAAR breaks out all S3 data load configuration into enviroment
variables. This ensures that running under test has no chance of
clobbering the production data in the event that a developer has AWS
configuration keys installed locally in `~/.aws/`
Production enviroment variables required for TAAR
# Production environment variables required for TAAR
## Collaborative Recommender
Env Variable | Value
------- | ---
TAAR_ITEM_MATRIX_BUCKET | "telemetry-parquet"
TAAR_ITEM_MATRIX_KEY | "telemetry-ml/addon_recommender/item_matrix.json"
TAAR_ADDON_MAPPING_BUCKET | "telemetry-parquet"
TAAR_ADDON_MAPPING_KEY | "telemetry-ml/addon_recommender/addon_mapping.json"
TAAR_ITEM_MATRIX_BUCKET | "moz-fx-data-taar-pr-prod-e0f7-prod-models"
TAAR_ITEM_MATRIX_KEY | "addon_recommender/item_matrix.json.bz2"
TAAR_ADDON_MAPPING_BUCKET | "moz-fx-data-taar-pr-prod-e0f7-prod-models"
TAAR_ADDON_MAPPING_KEY | "addon_recommender/addon_mapping.json.bz2"
## Ensemble Recommender
Env Variable | Value
--- | ---
TAAR_ENSEMBLE_BUCKET | "telemetry-parquet"
TAAR_ENSEMBLE_KEY | "taar/ensemble/ensemble_weight.json"
TAAR_ENSEMBLE_BUCKET | "moz-fx-data-taar-pr-prod-e0f7-prod-models"
TAAR_ENSEMBLE_KEY | "taar/ensemble/ensemble_weight.json.bz2"
## Locale Recommender
Env Variable | Value
--- | ---
TAAR_LOCALE_BUCKET | "telemetry-parquet"
TAAR_LOCALE_KEY | "taar/locale/top10_dict.json"
TAAR_LOCALE_BUCKET | "moz-fx-data-taar-pr-prod-e0f7-prod-models"
TAAR_LOCALE_KEY | "taar/locale/top10_dict.json.bz2"
## Similarity Recommender
Env Variable | Value
--- | ---
TAAR_SIMILARITY_BUCKET | "telemetry-parquet"
TAAR_SIMILARITY_DONOR_KEY | "taar/similarity/donors.json"
TAAR_SIMILARITY_LRCURVES_KEY | "taar/similarity/lr_curves.json"
TAAR_SIMILARITY_BUCKET | "moz-fx-data-taar-pr-prod-e0f7-prod-models"
TAAR_SIMILARITY_DONOR_KEY | "taar/similarity/donors.json.bz2"
TAAR_SIMILARITY_LRCURVES_KEY | "taar/similarity/lr_curves.json.bz2"
## TAAR Lite
Env Variable | Value
--- | ---
TAARLITE_GUID_COINSTALL_BUCKET | "moz-fx-data-taar-pr-prod-e0f7-prod-models"
TAARLITE_GUID_COINSTALL_KEY | "taar/lite/guid_coinstallation.json.bz2"
TAARLITE_GUID_RANKING_KEY | "taar/lite/guid_install_ranking.json.bz2"
## Google Cloud Platform resources
@ -215,7 +212,7 @@ The table ID for user profile information is `taar_profile`.
## Production Configuration Settings
Production enviroment settings are stored in a [private repository](https://github.com/mozilla-services/cloudops-deployment/blob/master/projects/data/puppet/yaml/type/data.api.prod.taar.yaml).
Production environment settings are stored in a [private repository](https://github.com/mozilla-services/cloudops-deployment/blob/master/projects/data/puppet/yaml/type/data.api.prod.taar.yaml).
## Deleting individual user data from all TAAR resources
@ -248,7 +245,7 @@ Users who wish to remove their data from TAAR need to:
## Airflow enviroment configuration
## Airflow environment configuration
TAAR requires some configuration to be stored in Airflow variables for
the ETL jobs to run to completion correctly.
@ -257,13 +254,14 @@ Airflow Variable | Value
--- | ---
taar_gcp_project_id | The Google Cloud Platform project where BigQuery temporary tables, Cloud Storage buckets for Avro files and BigTable reside for TAAR.
taar_etl_storage_bucket | The Cloud Storage bucket name where temporary Avro files will reside when transferring data from BigQuery to BigTable.
taar_etl_model_storage_bucket | The main GCS bucket where the models are stored
taar_bigtable_instance_id | The BigTable instance ID for TAAR user profile information
taar_dataflow_subnetwork | The subnetwork required to communicate between Cloud Dataflow
## Staging Enviroment
## Staging Environment
The staging enviroment of the TAAR service in GCP can be reached using
The staging environment of the TAAR service in GCP can be reached using
curl.
```
@ -304,7 +302,7 @@ Usage: taarlite-redis.py [OPTIONS]
Manage the TAARLite redis cache.
This expecte that the following enviroment variables are set:
This expecte that the following environment variables are set:
REDIS_HOST REDIS_PORT

48
bin/run_package_test.py Normal file
Просмотреть файл

@ -0,0 +1,48 @@
# Emulate package call from Ensemble Spark job
COLLABORATIVE, SIMILARITY, LOCALE = "collaborative", "similarity", "locale"
PREDICTOR_ORDER = [COLLABORATIVE, SIMILARITY, LOCALE]
def load_recommenders():
from taar.recommenders import LocaleRecommender
from taar.recommenders import SimilarityRecommender
from taar.recommenders import CollaborativeRecommender
from taar.context import package_context
ctx = package_context()
lr = LocaleRecommender(ctx)
sr = SimilarityRecommender(ctx)
cr = CollaborativeRecommender(ctx)
return {LOCALE: lr, COLLABORATIVE: cr, SIMILARITY: sr}
if __name__ == '__main__':
for i in range(2):
rec_map = load_recommenders()
recommender_list = [
rec_map[COLLABORATIVE].recommend, # Collaborative
rec_map[SIMILARITY].recommend, # Similarity
rec_map[LOCALE].recommend, # Locale
]
client_data = {"installed_addons": ["uBlock0@raymondhill.net"],
"locale": "en-CA",
"client_id": "test-client-001",
"activeAddons": [],
"geo_city": "brasilia-br",
"subsession_length": 4911,
"os": "mac",
"bookmark_count": 7,
"tab_open_count": 4,
"total_uri": 222,
"unique_tlds": 21
}
for key, rec in rec_map.items():
print(key)
assert rec.can_recommend(client_data)
assert len(rec.recommend(client_data, limit=4)) == 4

Просмотреть файл

@ -1,7 +1,6 @@
#!/usr/bin/env python
from taar.recommenders.redis_cache import TAARCache
from taar.context import default_context
from taar.interfaces import ITAARCache
from taar.context import app_context
import click
@ -22,8 +21,9 @@ def main(reset, load, info):
print("No options were set!")
return
ctx = default_context()
cache = TAARCache.get_instance(ctx)
ctx = app_context()
cache = ctx[ITAARCache]
if reset:
if cache.reset():
print("Successfully flushed db0 bookkeeping database.")

Просмотреть файл

@ -1,16 +1,23 @@
# Setup shell test enviroment settings
export TAAR_API_PLUGIN=taar.plugin
export TAAR_ITEM_MATRIX_BUCKET=telemetry-public-analysis-2
export TAAR_ITEM_MATRIX_KEY=telemetry-ml/addon_recommender/item_matrix.json
export TAAR_ADDON_MAPPING_BUCKET=telemetry-public-analysis-2
export TAAR_ADDON_MAPPING_KEY=telemetry-ml/addon_recommender/addon_mapping.json
export TAAR_ENSEMBLE_BUCKET=telemetry-parquet
export TAAR_ENSEMBLE_KEY=taar/ensemble/ensemble_weight.json
export TAAR_WHITELIST_BUCKET=telemetry-parquet
export TAAR_WHITELIST_KEY=telemetry-ml/addon_recommender/only_guids_top_200.json
export TAAR_LOCALE_BUCKET=telemetry-parquet
export TAAR_LOCALE_KEY=taar/locale/top10_dict.json
export TAAR_SIMILARITY_BUCKET=telemetry-parquet
export TAAR_SIMILARITY_DONOR_KEY=taar/similarity/donors.json
export TAAR_SIMILARITY_LRCURVES_KEY=taar/similarity/lr_curves.json
export TAAR_MAX_RESULTS=4
export DISABLE_REDIS=True
export TAAR_MAX_RESULTS=10
export BIGTABLE_PROJECT_ID=moz-fx-data-taar-pr-prod-e0f7
export BIGTABLE_INSTANCE_ID=taar-prod-202006
export BIGTABLE_TABLE_ID=taar_profile
export TAAR_ITEM_MATRIX_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
export TAAR_ITEM_MATRIX_KEY=addon_recommender/item_matrix.json.bz2
export TAAR_ADDON_MAPPING_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
export TAAR_ADDON_MAPPING_KEY=addon_recommender/addon_mapping.json.bz2
export TAAR_ENSEMBLE_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
export TAAR_ENSEMBLE_KEY=taar/ensemble/ensemble_weight.json.bz2
export TAAR_WHITELIST_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
export TAAR_WHITELIST_KEY=addon_recommender/only_guids_top_200.json.bz2
export TAAR_LOCALE_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
export TAAR_LOCALE_KEY=taar/locale/top10_dict.json.bz2
export TAAR_SIMILARITY_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
export TAAR_SIMILARITY_DONOR_KEY=taar/similarity/donors.json.bz2
export TAAR_SIMILARITY_LRCURVES_KEY=taar/similarity/lr_curves.json.bz2
export TAARLITE_GUID_COINSTALL_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
export TAARLITE_GUID_COINSTALL_KEY=taar/lite/guid_coinstallation.json.bz2
export TAARLITE_GUID_RANKING_KEY=taar/lite/guid_install_ranking.json.bz2

Просмотреть файл

@ -8,34 +8,51 @@ services:
image: "taar:latest"
depends_on:
- redis
volumes:
- /Users/epavlov/.gcp_creds:/app/.gcp_creds
environment:
- MUSICMATCH_API=${MUSICMATCH_API}
- WORKERS=1
- THREADS=2
- LOG_LEVEL=20
- GOOGLE_APPLICATION_CREDENTIALS=/app/.gcp_creds/vng-taar-dev-clientinfo-svc.json
- GOOGLE_APPLICATION_CREDENTIALS=/app/.gcp_creds/moz-fx-data-taar-pr-prod-e0f7-bf36ebdc13e9.json
- REDIS_HOST=redis
- TAAR_API_PLUGIN=taar.plugin
- TAAR_ITEM_MATRIX_BUCKET=telemetry-public-analysis-2
- TAAR_ITEM_MATRIX_KEY=telemetry-ml/addon_recommender/item_matrix.json
- TAAR_ADDON_MAPPING_BUCKET=telemetry-public-analysis-2
- TAAR_ADDON_MAPPING_KEY=telemetry-ml/addon_recommender/addon_mapping.json
- TAAR_ENSEMBLE_BUCKET=telemetry-parquet
- TAAR_ENSEMBLE_KEY=taar/ensemble/ensemble_weight.json
- TAAR_WHITELIST_BUCKET=telemetry-parquet
- TAAR_WHITELIST_KEY=telemetry-ml/addon_recommender/only_guids_top_200.json
- TAAR_LOCALE_BUCKET=telemetry-parquet
- TAAR_LOCALE_KEY=taar/locale/top10_dict.json
- TAAR_SIMILARITY_BUCKET=telemetry-parquet
- TAAR_SIMILARITY_DONOR_KEY=taar/similarity/donors.json
- TAAR_SIMILARITY_LRCURVES_KEY=taar/similarity/lr_curves.json
- TAAR_MAX_RESULTS=10
- TAARLITE_MAX_RESULTS=4
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- BIGTABLE_PROJECT_ID=${BIGTABLE_PROJECT_ID}
- BIGTABLE_INSTANCE_ID=${BIGTABLE_INSTANCE_ID}
- BIGTABLE_TABLE_ID=${BIGTABLE_TABLE_ID}
- BIGTABLE_PROJECT_ID=moz-fx-data-taar-pr-prod-e0f7
- BIGTABLE_INSTANCE_ID=taar-prod-202006
- BIGTABLE_TABLE_ID=taar_profile
ports:
- "8000:8000"
populate-redis:
image: "taar:latest"
command:
- python
- /opt/conda/bin/taar-redis.py
- --load
depends_on:
- redis
volumes:
- /Users/epavlov/.gcp_creds:/app/.gcp_creds
environment:
- LOG_LEVEL=20
- GOOGLE_APPLICATION_CREDENTIALS=/app/.gcp_creds/moz-fx-data-taar-pr-prod-e0f7-bf36ebdc13e9.json
- REDIS_HOST=redis
- TAAR_ITEM_MATRIX_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
- TAAR_ITEM_MATRIX_KEY=addon_recommender/item_matrix.json.bz2
- TAAR_ADDON_MAPPING_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
- TAAR_ADDON_MAPPING_KEY=addon_recommender/addon_mapping.json.bz2
- TAAR_ENSEMBLE_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
- TAAR_ENSEMBLE_KEY=taar/ensemble/ensemble_weight.json.bz2
- TAAR_WHITELIST_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
- TAAR_WHITELIST_KEY=addon_recommender/only_guids_top_200.json.bz2
- TAAR_LOCALE_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
- TAAR_LOCALE_KEY=taar/locale/top10_dict.json.bz2
- TAAR_SIMILARITY_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
- TAAR_SIMILARITY_DONOR_KEY=taar/similarity/donors.json.bz2
- TAAR_SIMILARITY_LRCURVES_KEY=taar/similarity/lr_curves.json.bz2
- TAARLITE_GUID_COINSTALL_BUCKET=moz-fx-data-taar-pr-prod-e0f7-prod-models
- TAARLITE_GUID_COINSTALL_KEY=taar/lite/guid_coinstallation.json.bz2
- TAARLITE_GUID_RANKING_KEY=taar/lite/guid_install_ranking.json.bz2

Просмотреть файл

@ -33,6 +33,7 @@ and are autodeployed to gs://moz-fx-data-prod-airflow-dataproc-artifacts/jobs
* [taar_etl_.taar_amodump](https://github.com/mozilla/taar_gcp_etl/blob/master/taar_etl/taar_amodump.py)
* [taar_etl.taar_amowhitelist](https://github.com/mozilla/taar_gcp_etl/blob/master/taar_etl/taar_amowhitelist.py)
* [taar_etl.taar_update_whitelist](https://github.com/mozilla/taar_gcp_etl/blob/master/taar_etl/taar_update_whitelist.py)
* [taar_etl.taar_lite_guid_ranking](https://github.com/mozilla/taar_gcp_etl/blob/master/taar_etl/taar_lite_guid_ranking.py)
#### 4. TAAR User profile information
@ -95,6 +96,17 @@ Autopush on tag is currently enabled for staging environment.
You must inform operations to push the tag to production enviroment.
## Deploying Pypi package required for Ensemble Spark job
Update package version in setup.py
`make all`
`make upload`
Update package version in `taar_weekly` airflow DAG.
## A note about logging

Просмотреть файл

@ -59,6 +59,7 @@ dependencies:
- google-auth==1.15.0
- google-cloud-bigtable==1.2.1
- google-cloud-core==1.3.0
- google-cloud-storage==1.19.1
- googleapis-common-protos==1.51.0
- grpc-google-iam-v1==0.12.3
- grpcio==1.29.0

Просмотреть файл

@ -3,7 +3,7 @@ from setuptools import find_packages, setup
setup(
name="mozilla-taar3",
use_scm_version=False,
version="0.7.5",
version="1.0.7",
setup_requires=["setuptools_scm", "pytest-runner"],
tests_require=["pytest"],
include_package_data=True,

Просмотреть файл

@ -1,4 +1,3 @@
from .profile_fetcher import ProfileFetcher # noqa
import pkg_resources
__version__ = pkg_resources.require("mozilla-taar3")[0].version

Просмотреть файл

@ -18,6 +18,7 @@ In practice this makes testing easier and allows us to specialize
configuration information as we pass the context through an object
chain.
"""
from taar.interfaces import IMozLogging, ITAARCache
class InvalidInterface(Exception):
@ -49,7 +50,7 @@ class Context:
result = self._delegate[key]
return result
def get(self, key, default):
def get(self, key, default=None):
try:
result = self[key]
except KeyError:
@ -81,22 +82,44 @@ class Context:
return instance
def _default_context(log_level=None):
def package_context():
"""
Prepare context with minimal dependencies for TAAR package to be used in Ensemble recommender Spark job
"""
from taar.settings import PackageCacheSettings
from taar.logs.stubs import LoggingStub
from taar.recommenders.cache import TAARCache
ctx = Context()
from taar.logs import Logging
from taar.logs import IMozLogging
ctx['cache_settings'] = PackageCacheSettings
ctx[IMozLogging] = LoggingStub(ctx)
ctx[ITAARCache] = TAARCache(ctx)
logger = Logging(ctx)
if log_level:
logger.set_log_level(log_level)
ctx[IMozLogging] = logger
return ctx
def default_context(log_level=None):
ctx = _default_context(log_level)
def app_context():
"""
Prepare context for TAAR web servie
"""
from taar.settings import AppSettings, DefaultCacheSettings, RedisCacheSettings
from taar.recommenders.cache import TAARCache
from taar.recommenders.redis_cache import TAARCacheRedis
from taar.logs.moz_logging import Logging
ctx = Context()
logger = Logging(ctx)
logger.set_log_level(AppSettings.PYTHON_LOG_LEVEL)
ctx[IMozLogging] = logger
if AppSettings.DISABLE_REDIS:
ctx['cache_settings'] = DefaultCacheSettings
ctx[ITAARCache] = TAARCache.get_instance(ctx)
else:
ctx['cache_settings'] = RedisCacheSettings
ctx[ITAARCache] = TAARCacheRedis.get_instance(ctx)
from taar.recommenders import CollaborativeRecommender
from taar.recommenders import SimilarityRecommender
from taar.recommenders import LocaleRecommender

34
taar/interfaces.py Normal file
Просмотреть файл

@ -0,0 +1,34 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# copy paste from https://github.com/mozilla/srgutil to get rid of this heavy legacy dependency
try:
from abc import ABC
except Exception:
from abc import ABCMeta
class ABC(object):
"""Helper class that provides a standard way to create an ABC using
inheritance.
"""
__metaclass__ = ABCMeta
__slots__ = ()
class IMozLogging(ABC):
def get_logger(self, name):
"""Get a logger with the current configuration
"""
def set_log_level(self, level):
"""Set the logs level, fox example 'DEBUG'
"""
class ITAARCache(ABC):
def safe_load_data(self):
raise NotImplementedError()
def cache_context(self):
raise NotImplementedError()

Просмотреть файл

Просмотреть файл

@ -3,29 +3,11 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# copy paste from https://github.com/mozilla/srgutil to get rid of this heavy legacy dependency
try:
from abc import ABC
except Exception:
from abc import ABCMeta
class ABC(object):
"""Helper class that provides a standard way to create an ABC using
inheritance.
"""
__metaclass__ = ABCMeta
__slots__ = ()
from taar.interfaces import IMozLogging
import logging.config
import sys
class IMozLogging(ABC):
def get_logger(self, name):
"""Get a logger with the current configuration
"""
class ContextFilter(logging.Filter):
"""Enhances log messages with contextual information"""

39
taar/logs/stubs.py Normal file
Просмотреть файл

@ -0,0 +1,39 @@
from sys import exc_info
from taar.interfaces import IMozLogging
class EmergencyLogger:
"""
We need this one to get rid of python logging dependency in Ensemble spark job
(see more detailed explanation in readme).
It uses only print and logs only errors and warnings
"""
def debug(self, msg, *args, **kwags):
pass
def info(self, msg, *args, **kwags):
pass
def warn(self, msg, *args, **kwags):
print(f'WARN: {msg}')
def warning(self, msg, *args, **kwags):
self.warn(msg)
def error(self, msg, e=None, *args, **kwags):
print(f'ERROR: {msg}, {e or exc_info()}')
def exception(self, msg, *args, **kwargs):
self.error(msg, *args, **kwargs)
class LoggingStub(IMozLogging):
def __init__(self, ctx):
pass
def get_logger(self, name):
return EmergencyLogger()
def set_log_level(self, level):
pass

Просмотреть файл

@ -9,41 +9,35 @@ import markus
from sentry_sdk import capture_exception
# TAAR specific libraries
from taar.context import default_context
from taar.logs import ContextFilter
from taar.context import app_context
from taar.logs.moz_logging import ContextFilter
from taar.profile_fetcher import ProfileFetcher
from taar import recommenders
from taar.settings import (
TAAR_MAX_RESULTS,
TAARLITE_MAX_RESULTS,
STATSD_HOST,
STATSD_PORT,
PYTHON_LOG_LEVEL
)
from taar.recommenders.guid_based_recommender import GuidBasedRecommender
from taar.recommenders.recommendation_manager import RecommenderFactory, RecommendationManager
from taar.settings import AppSettings
def acquire_taarlite_singleton(PROXY_MANAGER):
if PROXY_MANAGER.getTaarLite() is None:
ctx = default_context(log_level=PYTHON_LOG_LEVEL)
ctx = app_context()
root_ctx = ctx.child()
instance = recommenders.GuidBasedRecommender(root_ctx)
instance = GuidBasedRecommender(root_ctx)
PROXY_MANAGER.setTaarLite(instance)
return PROXY_MANAGER.getTaarLite()
def acquire_taar_singleton(PROXY_MANAGER):
if PROXY_MANAGER.getTaarRM() is None:
ctx = default_context(log_level=PYTHON_LOG_LEVEL)
profile_fetcher = ProfileFetcher(ctx)
ctx = app_context()
profile_fetcher = ProfileFetcher(ctx)
ctx["profile_fetcher"] = profile_fetcher
# Lock the context down after we've got basic bits installed
root_ctx = ctx.child()
r_factory = recommenders.RecommenderFactory(root_ctx)
r_factory = RecommenderFactory(root_ctx)
root_ctx["recommender_factory"] = r_factory
instance = recommenders.RecommendationManager(root_ctx.child())
instance = RecommendationManager(root_ctx.child())
PROXY_MANAGER.setTaarRM(instance)
return PROXY_MANAGER.getTaarRM()
@ -120,8 +114,8 @@ def configure_plugin(app): # noqa: C901
# server. Use DatadogMetrics client
"class": "markus.backends.datadog.DatadogMetrics",
"options": {
"statsd_host": STATSD_HOST,
"statsd_port": STATSD_PORT,
"statsd_host": AppSettings.STATSD_HOST,
"statsd_port": AppSettings.STATSD_PORT,
"statsd_namespace": "",
},
}
@ -146,10 +140,10 @@ def configure_plugin(app): # noqa: C901
with ContextFilter(taarlite_recommender.logger, set_extra):
recommendations = taarlite_recommender.recommend(
client_data=cdict, limit=TAARLITE_MAX_RESULTS
client_data=cdict, limit=AppSettings.TAARLITE_MAX_RESULTS
)
if len(recommendations) != TAARLITE_MAX_RESULTS:
if len(recommendations) != AppSettings.TAARLITE_MAX_RESULTS:
recommendations = []
# Strip out weights from TAAR results to maintain compatibility
@ -246,7 +240,7 @@ def configure_plugin(app): # noqa: C901
with ContextFilter(recommendation_manager.logger, set_extra):
recommendations = recommendation_manager.recommend(
client_id=client_id, limit=TAAR_MAX_RESULTS, extra_data=extra_data
client_id=client_id, limit=AppSettings.TAAR_MAX_RESULTS, extra_data=extra_data
)
promoted_guids = extra_data.get("options", {}).get("promoted", [])

Просмотреть файл

@ -2,20 +2,16 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from taar.logs import IMozLogging
from taar.interfaces import IMozLogging
from google.cloud import bigtable
from google.cloud.bigtable import column_family
from google.cloud.bigtable import row_filters
import json
import zlib
import datetime
from taar.settings import (
BIGTABLE_PROJECT_ID,
BIGTABLE_INSTANCE_ID,
BIGTABLE_TABLE_ID,
)
import markus
from taar.settings import AppSettings
metrics = markus.get_metrics("taar")
@ -102,9 +98,9 @@ class ProfileFetcher:
if self.__client is None:
self.__client = BigTableProfileController(
self._ctx,
project_id=BIGTABLE_PROJECT_ID,
instance_id=BIGTABLE_INSTANCE_ID,
table_id=BIGTABLE_TABLE_ID,
project_id=AppSettings.BIGTABLE_PROJECT_ID,
instance_id=AppSettings.BIGTABLE_INSTANCE_ID,
table_id=AppSettings.BIGTABLE_TABLE_ID,
)
return self.__client

Просмотреть файл

@ -1,15 +1,10 @@
from .collaborative_recommender import CollaborativeRecommender
from .guid_based_recommender import GuidBasedRecommender
from .locale_recommender import LocaleRecommender
from .recommendation_manager import RecommendationManager, RecommenderFactory
from .similarity_recommender import SimilarityRecommender
__all__ = [
"CollaborativeRecommender",
"GuidBasedRecommender",
"LocaleRecommender",
"SimilarityRecommender",
"RecommendationManager",
"RecommenderFactory",
]

515
taar/recommenders/cache.py Normal file
Просмотреть файл

@ -0,0 +1,515 @@
import numpy as np
import bz2
import io
import json
from google.cloud import storage
from taar.interfaces import IMozLogging, ITAARCache
# taarlite guid guid coinstallation matrix
COINSTALL_PREFIX = "coinstall|"
# taarlite guid guid coinstallation matrix filtered by
# minimum installation threshholds
FILTERED_COINSTALL_PREFIX = "filtered_coinstall|"
# taarlite ranking data
RANKING_PREFIX = "ranking|"
# taarlite minimum installation threshold
MIN_INSTALLS_PREFIX = "min_installs|"
# taarlite map of guid->(sum of coinstall counts)
NORMDATA_COUNT_MAP_PREFIX = "normdata_count_map_prefix|"
# taarlite number of times a GUID shows up per row
# of coinstallation data.
NORMDATA_ROWCOUNT_PREFIX = "normdata_rowcount_prefix|"
# taarlite row nownormalization data
NORMDATA_GUID_ROW_NORM_PREFIX = "normdata_guid_row_norm_prefix|"
# TAAR: Locale data
LOCALE_DATA = "taar_locale_data|"
# TAAR: collaborative data
COLLAB_MAPPING_DATA = "taar_collab_mapping|"
COLLAB_ITEM_MATRIX = "taar_collab_item_matrix|"
# TAAR: similarity data
SIMILARITY_DONORS = "taar_similarity_donors|"
SIMILARITY_LRCURVES = "taar_similarity_lrcurves|"
# TAAR: similarity preprocessed data
SIMILARITY_NUM_DONORS = "taar_similarity_num_donors|"
SIMILARITY_CONTINUOUS_FEATURES = "taar_similarity_continuous_features|"
SIMILARITY_CATEGORICAL_FEATURES = "taar_similarity_categorical_features|"
# TAAR: ensemble weights
ENSEMBLE_WEIGHTS = "taar_ensemble_weights|"
# TAAR: whitelist data
WHITELIST_DATA = "taar_whitelist_data|"
class TAARCache(ITAARCache):
_instance = None
"""
Design of this class is heavily influenced by TAARCacheRedis needs.
In fact, it was extracted from TAARCacheRedis to be used in
EnsembleRecommender weights update Spark job independently from Redis
"""
def __init__(self, ctx):
"""
Don't call this directly - use get_instance instace
"""
self._dict_db = {}
self._similarity_num_donors = 0
self._similarity_continuous_features = None
self._similarity_categorical_features = None
self._ctx = ctx
self._last_db = None
self.logger = None
moz_logging = self._ctx.get(IMozLogging)
self._settings = self._ctx['cache_settings']
self.logger = moz_logging.get_logger("taar") if moz_logging else None
@classmethod
def get_instance(cls, ctx):
if cls._instance is None:
cls._instance = TAARCache(ctx)
return cls._instance
# TAARCacheRedis compatibility
def safe_load_data(self):
if len(self._dict_db) == 0:
self._copy_data(self._dict_db)
self._build_cache_context(self._dict_db)
def _db_get(self, key, default=None, db=None):
self.safe_load_data()
return (db or self._dict_db).get(key, default)
def _db_set(self, key, val, db):
self._dict_db[key] = val
def is_active(self):
"""
return True if data is loaded
"""
return len(self._dict_db) > 0
def ensure_db_loaded(self):
self.safe_load_data()
def cache_context(self):
self.ensure_db_loaded()
return self._cache_context
# Getters
def guid_maps_count_map(self, guid, default=None):
return self._db_get(NORMDATA_COUNT_MAP_PREFIX + guid) or default
def guid_maps_rowcount(self, guid, default=None):
return self._db_get(NORMDATA_ROWCOUNT_PREFIX + guid) or default
def guid_maps_row_norm(self, guid, default=None):
return self._db_get(NORMDATA_GUID_ROW_NORM_PREFIX + guid) or default
def min_installs(self, db):
"""
Return the floor minimum installed addons that we will
consider, or 0 if nothing is currently stored in redis
"""
result = self._db_get(MIN_INSTALLS_PREFIX, db=db)
if result is None:
return 0
return float(result)
def get_filtered_coinstall(self, guid, default=None):
tmp = self._db_get(FILTERED_COINSTALL_PREFIX + guid)
if tmp:
raw_dict = tmp
# This truncates the size of the coinstall list for
# performance reasons
return dict(
sorted(raw_dict.items(), key=lambda x: x[1], reverse=True)[:self._settings.TAARLITE_TRUNCATE]
)
return default
def get_rankings(self, guid, default=None):
"""
Return the rankings
"""
return self._db_get(RANKING_PREFIX + guid) or default
def has_coinstalls_for(self, guid):
return self._db_get(COINSTALL_PREFIX + guid) is not None
def get_coinstalls(self, guid, default=None):
"""
Return a map of GUID:install count that represents the
coinstallation map for a particular addon GUID
"""
return self._db_get(COINSTALL_PREFIX + guid) or default
def top_addons_per_locale(self):
"""
Get locale data
"""
return self._db_get(LOCALE_DATA)
def collab_raw_item_matrix(self):
"""
Get the taar collaborative item matrix
"""
return self._db_get(COLLAB_ITEM_MATRIX)
def collab_addon_mapping(self):
"""
Get the taar collaborative addon mappin
"""
return self._db_get(COLLAB_MAPPING_DATA)
def similarity_donors(self):
"""
Get the taar similarity donors
"""
return self._db_get(SIMILARITY_DONORS)
def similarity_lrcurves(self):
"""
Get the taar similarity donors
"""
return self._db_get(SIMILARITY_LRCURVES)
def similarity_continuous_features(self):
"""
precomputed similarity recommender continuous features cache
"""
self.ensure_db_loaded()
return self._similarity_continuous_features
def similarity_categorical_features(self):
"""
precomputed similarity recommender categorical features cache
"""
self.ensure_db_loaded()
return self._similarity_categorical_features
@property
def similarity_num_donors(self):
"""
precomputed similarity recommender categorical features cache
"""
self.ensure_db_loaded()
return self._similarity_num_donors
def ensemble_weights(self):
return self._db_get(ENSEMBLE_WEIGHTS)
def whitelist_data(self):
return self._db_get(WHITELIST_DATA)
# GCS fetching
def _load_from_gcs(self, bucket, path):
"""
Load a JSON object off of a GCS bucket and path.
If the path ends with '.bz2', decompress the object prior to JSON
decode.
"""
try:
with io.BytesIO() as tmpfile:
client = storage.Client()
bucket = client.get_bucket(bucket)
blob = bucket.blob(path)
blob.download_to_file(tmpfile)
tmpfile.seek(0)
payload = tmpfile.read()
if path.endswith(".bz2"):
payload = bz2.decompress(payload)
path = path[:-4]
if path.endswith(".json"):
payload = json.loads(payload.decode("utf8"))
return payload
except Exception:
self.logger.exception(f"Error loading from gcs://{bucket}/{path}")
return None
def _fetch_coinstall_data(self):
return self._load_from_gcs(self._settings.TAARLITE_GUID_COINSTALL_BUCKET,
self._settings.TAARLITE_GUID_COINSTALL_KEY)
def _fetch_ranking_data(self):
return self._load_from_gcs(self._settings.TAARLITE_GUID_COINSTALL_BUCKET,
self._settings.TAARLITE_GUID_RANKING_KEY)
def _fetch_locale_data(self):
return self._load_from_gcs(self._settings.TAAR_LOCALE_BUCKET, self._settings.TAAR_LOCALE_KEY)
def _fetch_collaborative_mapping_data(self):
return self._load_from_gcs(self._settings.TAAR_ADDON_MAPPING_BUCKET, self._settings.TAAR_ADDON_MAPPING_KEY)
def _fetch_collaborative_item_matrix(self):
return self._load_from_gcs(self._settings.TAAR_ITEM_MATRIX_BUCKET, self._settings.TAAR_ITEM_MATRIX_KEY)
def _fetch_similarity_donors(self):
return self._load_from_gcs(self._settings.TAAR_SIMILARITY_BUCKET, self._settings.TAAR_SIMILARITY_DONOR_KEY)
def _fetch_similarity_lrcurves(self):
return self._load_from_gcs(self._settings.TAAR_SIMILARITY_BUCKET, self._settings.TAAR_SIMILARITY_LRCURVES_KEY)
def _fetch_ensemble_weights(self):
return self._load_from_gcs(self._settings.TAAR_ENSEMBLE_BUCKET, self._settings.TAAR_ENSEMBLE_KEY)
def _fetch_whitelist(self):
return self._load_from_gcs(self._settings.TAAR_WHITELIST_BUCKET, self._settings.TAAR_WHITELIST_KEY)
# Data update
def _build_cache_context(self, db):
self._build_similarity_features_caches(db)
"""
Fetch from redis once per request
"""
tmp = {
# Similarity stuff
"lr_curves": self.similarity_lrcurves(),
"num_donors": self.similarity_num_donors,
"continuous_features": self.similarity_continuous_features(),
"categorical_features": self.similarity_categorical_features(),
"donors_pool": self.similarity_donors(),
# Collaborative
"addon_mapping": self.collab_addon_mapping(),
"raw_item_matrix": self.collab_raw_item_matrix(),
# Locale
"top_addons_per_locale": self.top_addons_per_locale(),
# Ensemble
"whitelist": self.whitelist_data(),
"ensemble_weights": self.ensemble_weights(),
}
def compute_collab_model(val):
if val not in (None, ""):
num_rows = len(val)
num_cols = len(val[0]["features"])
model = np.zeros(shape=(num_rows, num_cols))
for index, row in enumerate(val):
model[index, :] = row["features"]
else:
model = None
return model
tmp["collab_model"] = compute_collab_model(tmp["raw_item_matrix"])
self._cache_context = tmp
def _build_similarity_features_caches(self, db):
"""
This function build two feature cache matrices and sets the
number of donors (self.similarity_num_donors)
That's the self.categorical_features and
self.continuous_features attributes.
One matrix is for the continuous features and the other is for
the categorical features. This is needed to speed up the similarity
recommendation process."""
from taar.recommenders.similarity_recommender import (
CONTINUOUS_FEATURES,
CATEGORICAL_FEATURES,
)
donors_pool = self._db_get(SIMILARITY_DONORS, db=db)
if donors_pool is None:
return
self._similarity_num_donors = len(donors_pool)
# Build a numpy matrix cache for the continuous features.
continuous_features = np.zeros(
(self.similarity_num_donors, len(CONTINUOUS_FEATURES))
)
for idx, d in enumerate(donors_pool):
features = [d.get(specified_key) for specified_key in CONTINUOUS_FEATURES]
continuous_features[idx] = features
self._similarity_continuous_features = continuous_features
# Build the cache for categorical features.
categorical_features = np.zeros(
(self.similarity_num_donors, len(CATEGORICAL_FEATURES)), dtype="object",
)
for idx, d in enumerate(donors_pool):
features = [d.get(specified_key) for specified_key in CATEGORICAL_FEATURES]
categorical_features[idx] = np.array([features], dtype="object")
self._similarity_categorical_features = categorical_features
self.logger.info("Reconstructed matrices for similarity recommender")
def _update_whitelist_data(self, db):
"""
Load the TAAR whitelist data
"""
tmp = self._fetch_whitelist()
if tmp:
self._db_set(WHITELIST_DATA, tmp, db)
def _update_ensemble_data(self, db):
"""
Load the TAAR ensemble data
"""
tmp = self._fetch_ensemble_weights()
if tmp:
self._db_set(ENSEMBLE_WEIGHTS, tmp["ensemble_weights"], db)
def _update_similarity_data(self, db):
"""
Load the TAAR similarity data
"""
donors = self._fetch_similarity_donors()
lrcurves = self._fetch_similarity_lrcurves()
self._db_set(SIMILARITY_DONORS, donors, db)
self._db_set(SIMILARITY_LRCURVES, lrcurves, db)
def _update_collab_data(self, db):
"""
Load the TAAR collaborative data. This is two parts: an item
matrix and a mapping of GUIDs
"""
# Load the item matrix into redis
item_matrix = self._fetch_collaborative_item_matrix()
self._db_set(COLLAB_ITEM_MATRIX, item_matrix, db)
# Load the taar collaborative mapping data
mapping_data = self._fetch_collaborative_mapping_data()
self._db_set(COLLAB_MAPPING_DATA, mapping_data, db)
def _update_locale_data(self, db):
"""
Load the TAAR locale data
"""
data = self._fetch_locale_data()
result = {}
for locale, guid_list in data.items():
result[locale] = sorted(guid_list, key=lambda x: x[1], reverse=True)
self._db_set(LOCALE_DATA, result, db)
def _update_coinstall_data(self, db):
"""
Load the TAAR Lite GUID GUID coinstallation data
"""
data = self._fetch_coinstall_data()
items = data.items()
len_items = len(items)
guid_count_map = {}
row_count = {}
guid_row_norm = {}
for i, (guid, coinstalls) in enumerate(items):
tmp = dict(
[(k, v) for (k, v) in coinstalls.items() if v >= self.min_installs(db)]
)
self._db_set(FILTERED_COINSTALL_PREFIX + guid, tmp, db)
rowsum = sum(coinstalls.values())
for coinstall_guid, coinstall_count in coinstalls.items():
# Capture the total number of time a GUID was
# coinstalled with other guids
guid_count_map.setdefault(coinstall_guid, 0)
guid_count_map[coinstall_guid] += coinstall_count
# Capture the unique number of times a GUID is
# coinstalled with other guids
row_count.setdefault(coinstall_guid, 0)
row_count[coinstall_guid] += 1
if coinstall_guid not in guid_row_norm:
guid_row_norm[coinstall_guid] = []
guid_row_norm[coinstall_guid].append(1.0 * coinstall_count / rowsum)
self._db_set(COINSTALL_PREFIX + guid, coinstalls, db)
if i % 1000 == 0:
self.logger.info(
f"Loaded {i + 1} of {len_items} GUID-GUID coinstall records into redis"
)
self.logger.info("guidmaps computed - saving")
for guid, guid_count in guid_count_map.items():
self._db_set(NORMDATA_COUNT_MAP_PREFIX + guid, guid_count, db)
for coinstall_guid, coinstall_count in row_count.items():
self._db_set(NORMDATA_ROWCOUNT_PREFIX + coinstall_guid, coinstall_count, db)
for coinstall_guid, norm_val in guid_row_norm.items():
self._db_set(NORMDATA_GUID_ROW_NORM_PREFIX + coinstall_guid, norm_val, db)
self.logger.info("finished saving guidmaps")
def _update_rank_data(self, db):
data = self._fetch_ranking_data()
items = data.items()
len_items = len(items)
for i, (guid, count) in enumerate(items):
self._db_set(RANKING_PREFIX + guid, count, db)
if i % 1000 == 0:
self.logger.info(f"Loaded {i + 1} of {len_items} GUID ranking into redis")
min_installs = np.mean(list(data.values())) * 0.05
self._db_set(MIN_INSTALLS_PREFIX, min_installs, db)
self.logger.info(f"Updated MIN_INSTALLS: {min_installs}")
def _copy_data(self, db):
# Update TAARlite
# it loads a lot of data which we don't need for Ensemble Spark job
if not self._settings.DISABLE_TAAR_LITE:
self._update_rank_data(db)
self._update_coinstall_data(db)
# Update TAAR locale data
self._update_locale_data(db)
# Update TAAR collaborative data
self._update_collab_data(db)
# Update TAAR similarity data
self._update_similarity_data(db)
if not self._settings.DISABLE_ENSEMBLE:
# Update TAAR ensemble data
self._update_ensemble_data(db)
# Update TAAR ensemble data
self._update_whitelist_data(db)

Просмотреть файл

@ -2,17 +2,11 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from taar.logs import IMozLogging
from taar.interfaces import IMozLogging, ITAARCache
import numpy as np
import operator as op
from .base_recommender import AbstractRecommender
from taar.recommenders.redis_cache import TAARCache
import markus
metrics = markus.get_metrics("taar")
from taar.recommenders.base_recommender import AbstractRecommender
def java_string_hashcode(s):
@ -40,29 +34,21 @@ class CollaborativeRecommender(AbstractRecommender):
self.logger = self._ctx[IMozLogging].get_logger("taar")
self._redis_cache = TAARCache.get_instance(self._ctx)
self._cache = self._ctx[ITAARCache]
def _get_cache(self, extra_data):
tmp = extra_data.get("cache", None)
if tmp is None:
tmp = self._redis_cache.cache_context()
tmp = self._cache.cache_context()
return tmp
@property
def addon_mapping(self):
return self._redis_cache.collab_addon_mapping()
@property
def raw_item_matrix(self):
return self._redis_cache.collab_raw_item_matrix()
def can_recommend(self, client_data, extra_data={}):
cache = self._get_cache(extra_data)
# We can't recommend if we don't have our data files.
if (
cache["raw_item_matrix"] is None
or cache["collab_model"] is None
or cache["addon_mapping"] is None
cache["raw_item_matrix"] is None
or cache["collab_model"] is None
or cache["addon_mapping"] is None
):
return False
@ -104,10 +90,10 @@ class CollaborativeRecommender(AbstractRecommender):
hashed_id = addon.get("id")
str_hashed_id = str(hashed_id)
if (
hashed_id in installed_addons_as_hashes
or str_hashed_id not in cache["addon_mapping"]
or cache["addon_mapping"][str_hashed_id].get("isWebextension", False)
is False
hashed_id in installed_addons_as_hashes
or str_hashed_id not in cache["addon_mapping"]
or cache["addon_mapping"][str_hashed_id].get("isWebextension", False)
is False
):
continue
@ -123,21 +109,10 @@ class CollaborativeRecommender(AbstractRecommender):
recommendations = [(s[0], s[1]) for s in sorted_dists[:limit]]
return recommendations
@metrics.timer_decorator("collaborative_recommend")
def recommend(self, client_data, limit, extra_data={}):
# Addons identifiers are stored as positive hash values within the model.
try:
recommendations = self._recommend(client_data, limit, extra_data)
except Exception as e:
recommendations = []
metrics.incr("error_collaborative", value=1)
self.logger.exception(
"Collaborative recommender crashed for {}".format(
client_data.get("client_id", "no-client-id")
),
e,
)
recommendations = self._recommend(client_data, limit, extra_data)
log_data = (
client_data["client_id"],

Просмотреть файл

@ -7,11 +7,10 @@ import itertools
import markus
from sentry_sdk import capture_exception
from taar.logs import IMozLogging
from taar.interfaces import IMozLogging, ITAARCache
from taar.recommenders.debug import log_timer_debug
from taar.recommenders.redis_cache import TAARCache
from taar.utils import hasher
from .base_recommender import AbstractRecommender
from taar.recommenders.base_recommender import AbstractRecommender
metrics = markus.get_metrics("taar")
@ -34,7 +33,7 @@ class EnsembleRecommender(AbstractRecommender):
self.RECOMMENDER_KEYS = ["collaborative", "similarity", "locale"]
self._ctx = ctx
self._redis_cache = TAARCache.get_instance(self._ctx)
self._redis_cache = self._ctx[ITAARCache]
self.logger = self._ctx[IMozLogging].get_logger("taar.ensemble")
assert "recommender_factory" in self._ctx
@ -122,17 +121,7 @@ class EnsembleRecommender(AbstractRecommender):
ensemble_weights = cache["ensemble_weights"]
for rkey in self.RECOMMENDER_KEYS:
with log_timer_debug(f"{rkey} recommend invoked", self.logger):
recommender = self._recommender_map[rkey]
if recommender.can_recommend(client_data, extra_data):
raw_results = recommender.recommend(
client_data, extended_limit, extra_data
)
reweighted_results = []
for guid, weight in raw_results:
item = (guid, weight * ensemble_weights[rkey])
reweighted_results.append(item)
flattened_results.extend(reweighted_results)
self._recommend_single(client_data, ensemble_weights, extended_limit, extra_data, flattened_results, rkey)
# Sort the results by the GUID
flattened_results.sort(key=lambda item: item[0])
@ -169,3 +158,27 @@ class EnsembleRecommender(AbstractRecommender):
% log_data
)
return results
def _recommend_single(self, client_data, ensemble_weights, extended_limit, extra_data, flattened_results, rkey):
with log_timer_debug(f"{rkey} recommend invoked", self.logger):
recommender = self._recommender_map[rkey]
if not recommender.can_recommend(client_data, extra_data):
return
with metrics.timer(f"{rkey}_recommend"):
try:
raw_results = recommender.recommend(
client_data, extended_limit, extra_data
)
except Exception as e:
metrics.incr(f"error_{rkey}", value=1)
self.logger.exception(
"{} recommender crashed for {}".format(rkey,
client_data.get("client_id", "no-client-id")
),
e,
)
reweighted_results = []
for guid, weight in raw_results:
item = (guid, weight * ensemble_weights[rkey])
reweighted_results.append(item)
flattened_results.extend(reweighted_results)

Просмотреть файл

@ -3,11 +3,10 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
from taar.logs import IMozLogging
from taar.interfaces import IMozLogging, ITAARCache
import markus
from taar.recommenders.redis_cache import TAARCache
from taar.recommenders.debug import log_timer_debug
metrics = markus.get_metrics("taar")
@ -53,15 +52,15 @@ class GuidBasedRecommender:
self._ctx = ctx
self.logger = self._ctx[IMozLogging].get_logger("taarlite")
self._redis_cache = TAARCache.get_instance(self._ctx)
self._cache = ctx[ITAARCache]
self.logger.info("GUIDBasedRecommender is initialized")
def cache_ready(self):
return self._redis_cache.is_active()
return self._cache.is_active()
def can_recommend(self, client_data):
# We can't recommend if we don't have our data files.
if not self._redis_cache.is_active():
if not self._cache.is_active():
return False
# If we have data coming from other sources, we can use that for
@ -71,10 +70,10 @@ class GuidBasedRecommender:
return False
# Use a dictionary keyed on the query guid
if not self._redis_cache.has_coinstalls_for(addon_guid):
if not self._cache.has_coinstalls_for(addon_guid):
return False
if not self._redis_cache.get_coinstalls(addon_guid):
if not self._cache.get_coinstalls(addon_guid):
return False
return True
@ -85,7 +84,7 @@ class GuidBasedRecommender:
TAAR lite will yield 4 recommendations for the AMO page
"""
if not self._redis_cache.is_active():
if not self._cache.is_active():
return []
with log_timer_debug(f"Results computed", self.logger):
@ -95,7 +94,7 @@ class GuidBasedRecommender:
# Get the raw co-installation result dictionary
with log_timer_debug("Get filtered coinstallations", self.logger):
result_dict = self._redis_cache.get_filtered_coinstall(addon_guid, {})
result_dict = self._cache.get_filtered_coinstall(addon_guid, {})
with log_timer_debug("acquire normalization method", self.logger):
normalize = client_data.get("normalize", NORM_MODE_ROWNORMSUM)
@ -144,7 +143,7 @@ class GuidBasedRecommender:
)
for k, v in rank_sorted[:TWICE_LIMIT]:
lex_value = "{0:020.10f}.{1:010d}".format(
v, self._redis_cache.get_rankings(k, 0)
v, self._cache.get_rankings(k, 0)
)
result_list.append((k, lex_value))
@ -169,7 +168,7 @@ class GuidBasedRecommender:
output_result_dict = {}
for result_guid, result_count in input_coinstall_dict.items():
output_result_dict[result_guid] = (
1.0 * result_count / self._redis_cache.guid_maps_rowcount(result_guid)
1.0 * result_count / self._cache.guid_maps_rowcount(result_guid)
)
return output_result_dict
@ -182,7 +181,7 @@ class GuidBasedRecommender:
def generate_row_sum_list():
for guid, guid_weight in input_coinstall_dict.items():
norm_guid_weight = (
guid_weight * 1.0 / self._redis_cache.guid_maps_count_map(guid)
guid_weight * 1.0 / self._cache.guid_maps_count_map(guid)
)
yield guid, norm_guid_weight
@ -205,7 +204,7 @@ class GuidBasedRecommender:
):
output_dict = {}
for output_guid, output_guid_weight in tmp_dict.items():
guid_row_norm_list = self._redis_cache.guid_maps_row_norm(
guid_row_norm_list = self._cache.guid_maps_row_norm(
output_guid, []
)
if len(guid_row_norm_list) == 0:
@ -270,7 +269,7 @@ class GuidBasedRecommender:
# Add in the next level
level -= 1
for guid in consolidated_coinstall_dict.keys():
next_level_coinstalls = self._redis_cache.get_coinstalls(guid, {})
next_level_coinstalls = self._cache.get_coinstalls(guid, {})
if next_level_coinstalls != {}:
# Normalize the next bunch of suggestions
next_level_coinstalls = self._normalize_row_weights(

Просмотреть файл

@ -2,14 +2,10 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import markus
from taar.logs import IMozLogging
from taar.interfaces import IMozLogging, ITAARCache
from .base_recommender import AbstractRecommender
from taar.recommenders.redis_cache import TAARCache
metrics = markus.get_metrics("taar")
class LocaleRecommender(AbstractRecommender):
@ -28,18 +24,14 @@ class LocaleRecommender(AbstractRecommender):
self.logger = self._ctx[IMozLogging].get_logger("taar")
self._redis_cache = TAARCache.get_instance(self._ctx)
self._cache = self._ctx[ITAARCache]
def _get_cache(self, extra_data):
tmp = extra_data.get("cache", None)
if tmp is None:
tmp = self._redis_cache.cache_context()
tmp = self._cache.cache_context()
return tmp
@property
def top_addons_per_locale(self):
return self._redis_cache.top_addons_per_locale()
def can_recommend(self, client_data, extra_data={}):
cache = self._get_cache(extra_data)
@ -63,23 +55,7 @@ class LocaleRecommender(AbstractRecommender):
return True
@metrics.timer_decorator("locale_recommend")
def recommend(self, client_data, limit, extra_data={}):
try:
result_list = self._recommend(client_data, limit, extra_data)
except Exception as e:
result_list = []
metrics.incr("error_locale", value=1)
self.logger.exception(
"Locale recommender crashed for {}".format(
client_data.get("client_id", "no-client-id")
),
e,
)
return result_list
def _recommend(self, client_data, limit, extra_data={}):
cache = self._get_cache(extra_data)
# If we have data coming from multiple sourecs, prefer the one
# from 'client_data'.

Просмотреть файл

@ -4,14 +4,13 @@
import markus
from taar.logs import IMozLogging
from taar.interfaces import IMozLogging, ITAARCache
from taar.recommenders.debug import log_timer_debug
from taar.recommenders.ensemble_recommender import (
EnsembleRecommender,
is_test_client,
)
from taar.recommenders.randomizer import reorder_guids
from taar.recommenders.redis_cache import TAARCache
metrics = markus.get_metrics("taar")
@ -45,7 +44,7 @@ class RecommendationManager:
"""Initialize the user profile fetcher and the recommenders.
"""
self._ctx = ctx
self.logger = self._ctx[IMozLogging].get_logger("taar")
self.logger = self._ctx[IMozLogging].get_logger("taar") if self._ctx[IMozLogging] else None
assert "profile_fetcher" in self._ctx
@ -55,7 +54,7 @@ class RecommendationManager:
# The whitelist data is only used for test client IDs
self._redis_cache = TAARCache.get_instance(self._ctx)
self._cache = self._ctx[ITAARCache]
@metrics.timer_decorator("profile_recommendation")
def recommend(self, client_id, limit, extra_data={}):
@ -72,7 +71,7 @@ class RecommendationManager:
with log_timer_debug("recommmend executed", self.logger):
# Read everything from redis now
with log_timer_debug("redis read", self.logger):
extra_data["cache"] = self._redis_cache.cache_context()
extra_data["cache"] = self._cache.cache_context()
if is_test_client(client_id):
# Just create a stub client_info blob

Просмотреть файл

@ -6,47 +6,8 @@ import json
import os
import threading
import redis
import numpy as np
from taar.logs import IMozLogging
from taar.settings import (
REDIS_HOST,
REDIS_PORT,
)
# TAARLite configuration
from taar.settings import (
TAARLITE_GUID_COINSTALL_BUCKET,
TAARLITE_GUID_COINSTALL_KEY,
TAARLITE_GUID_RANKING_KEY,
TAARLITE_TRUNCATE,
TAARLITE_MUTEX_TTL,
)
# TAAR configuration
from taar.settings import (
# Locale
TAAR_LOCALE_BUCKET,
TAAR_LOCALE_KEY,
# Collaborative dta
TAAR_ADDON_MAPPING_BUCKET,
TAAR_ADDON_MAPPING_KEY,
TAAR_ITEM_MATRIX_BUCKET,
TAAR_ITEM_MATRIX_KEY,
# Similarity data
TAAR_SIMILARITY_BUCKET,
TAAR_SIMILARITY_DONOR_KEY,
TAAR_SIMILARITY_LRCURVES_KEY,
# Ensemble data
TAAR_ENSEMBLE_BUCKET,
TAAR_ENSEMBLE_KEY,
# Whitelist data
TAAR_WHITELIST_BUCKET,
TAAR_WHITELIST_KEY,
)
from jsoncache.loader import s3_json_loader
from taar.recommenders.cache import TAARCache, RANKING_PREFIX, COINSTALL_PREFIX
# This marks which of the redis databases is currently
@ -57,54 +18,6 @@ ACTIVE_DB = "active_db"
UPDATE_CHECK = "update_mutex|"
# taarlite guid guid coinstallation matrix
COINSTALL_PREFIX = "coinstall|"
# taarlite guid guid coinstallation matrix filtered by
# minimum installation threshholds
FILTERED_COINSTALL_PREFIX = "filtered_coinstall|"
# taarlite ranking data
RANKING_PREFIX = "ranking|"
# taarlite minimum installation threshold
MIN_INSTALLS_PREFIX = "min_installs|"
# taarlite map of guid->(sum of coinstall counts)
NORMDATA_COUNT_MAP_PREFIX = "normdata_count_map_prefix|"
# taarlite number of times a GUID shows up per row
# of coinstallation data.
NORMDATA_ROWCOUNT_PREFIX = "normdata_rowcount_prefix|"
# taarlite row nownormalization data
NORMDATA_GUID_ROW_NORM_PREFIX = "normdata_guid_row_norm_prefix|"
# TAAR: Locale data
LOCALE_DATA = "taar_locale_data|"
# TAAR: collaborative data
COLLAB_MAPPING_DATA = "taar_collab_mapping|"
COLLAB_ITEM_MATRIX = "taar_collab_item_matrix|"
# TAAR: similarity data
SIMILARITY_DONORS = "taar_similarity_donors|"
SIMILARITY_LRCURVES = "taar_similarity_lrcurves|"
# TAAR: similarity preprocessed data
SIMILARITY_NUM_DONORS = "taar_similarity_num_donors|"
SIMILARITY_CONTINUOUS_FEATURES = "taar_similarity_continuous_features|"
SIMILARITY_CATEGORICAL_FEATURES = "taar_similarity_categorical_features|"
# TAAR: ensemble weights
ENSEMBLE_WEIGHTS = "taar_ensemble_weights|"
# TAAR: whitelist data
WHITELIST_DATA = "taar_whitelist_data|"
class PrefixStripper:
def __init__(self, prefix, iterator, cast_to_str=False):
self._prefix = prefix
@ -116,13 +29,13 @@ class PrefixStripper:
def __next__(self):
result = self._iter.__next__()
result = result[len(self._prefix) :]
result = result[len(self._prefix):]
if self._cast_to_str:
result = str(result)
return result
class TAARCache:
class TAARCacheRedis(TAARCache):
"""
This class manages a redis instance to hold onto the taar-lite
GUID->GUID co-installation data
@ -133,29 +46,21 @@ class TAARCache:
@classmethod
def get_instance(cls, ctx):
if cls._instance is None:
cls._instance = TAARCache(ctx, i_didnt_read_the_docs=False)
cls._instance = TAARCacheRedis(ctx, i_didnt_read_the_docs=False)
return cls._instance
def __init__(self, ctx, i_didnt_read_the_docs=True):
"""
Don't call this directly - use get_instance instace
"""
super(TAARCacheRedis, self).__init__(ctx)
if i_didnt_read_the_docs:
raise RuntimeError(
"You cannot call this method directly - use get_instance"
)
self._ctx = ctx
self._last_db = None
self.logger = self._ctx[IMozLogging].get_logger("taar")
# Keep an integer handle (or None) on the last known database
self._last_db = None
self._similarity_num_donors = 0
self._similarity_continuous_features = None
self._similarity_categorical_features = None
rcon = self.init_redis_connections()
self._r0 = rcon[0]
@ -184,9 +89,9 @@ class TAARCache:
method to enable mocking for tests
"""
return {
0: redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0),
1: redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=1),
2: redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=2),
0: redis.Redis(host=self._settings.REDIS_HOST, port=self._settings.REDIS_PORT, db=0),
1: redis.Redis(host=self._settings.REDIS_HOST, port=self._settings.REDIS_PORT, db=1),
2: redis.Redis(host=self._settings.REDIS_HOST, port=self._settings.REDIS_PORT, db=2),
}
def safe_load_data(self):
@ -203,7 +108,7 @@ class TAARCache:
#
# The thread barrier will autoexpire in 10 minutes in the
# event of process termination inside the critical section.
self._r0.set(UPDATE_CHECK, self._ident, nx=True, ex=TAARLITE_MUTEX_TTL)
self._r0.set(UPDATE_CHECK, self._ident, nx=True, ex=self._settings.TAARLITE_MUTEX_TTL)
self.logger.info(f"UPDATE_CHECK field is set: {self._ident}")
# This is a concurrency barrier to make sure only the pinned
@ -223,68 +128,14 @@ class TAARCache:
self._r0.delete(UPDATE_CHECK)
self.logger.info("UPDATE_CHECK field is cleared")
def guid_maps_count_map(self, guid, default=None):
tmp = self._db().get(NORMDATA_COUNT_MAP_PREFIX + guid)
def _db_get(self, key, default=None, db=None):
tmp = (db or self._db()).get(key)
if tmp:
return json.loads(tmp.decode("utf8"))
return default
def guid_maps_rowcount(self, guid, default=None):
tmp = self._db().get(NORMDATA_ROWCOUNT_PREFIX + guid)
if tmp:
return json.loads(tmp.decode("utf8"))
return default
def guid_maps_row_norm(self, guid, default=None):
tmp = self._db().get(NORMDATA_GUID_ROW_NORM_PREFIX + guid)
if tmp:
return json.loads(tmp.decode("utf8"))
return default
def min_installs(self, db):
"""
Return the floor minimum installed addons that we will
consider, or 0 if nothing is currently stored in redis
"""
result = db.get(MIN_INSTALLS_PREFIX)
if result is None:
return 0
return float(result.decode("utf8"))
def get_filtered_coinstall(self, guid, default=None):
tmp = self._db().get(FILTERED_COINSTALL_PREFIX + guid)
if tmp:
raw_dict = json.loads(tmp.decode("utf8"))
# This truncates the size of the coinstall list for
# performance reasons
return dict(
sorted(raw_dict.items(), key=lambda x: x[1], reverse=True)[
:TAARLITE_TRUNCATE
]
)
return default
def get_rankings(self, guid, default=None):
"""
Return the rankings
"""
tmp = self._db().get(RANKING_PREFIX + guid)
if tmp:
return json.loads(tmp.decode("utf8"))
return default
def has_coinstalls_for(self, guid):
return self._db().get(COINSTALL_PREFIX + guid) is not None
def get_coinstalls(self, guid, default=None):
"""
Return a map of GUID:install count that represents the
coinstallation map for a particular addon GUID
"""
tmp = self._db().get(COINSTALL_PREFIX + guid)
if tmp:
return json.loads(tmp.decode("utf8"))
return default
def _db_set(self, key, val, db):
db.set(key, json.dumps(val))
def key_iter_ranking(self):
return PrefixStripper(
@ -303,132 +154,8 @@ class TAARCache:
# Any value in ACTIVE_DB indicates that data is live
return self._r0.get(ACTIVE_DB) is not None
def top_addons_per_locale(self):
"""
Get locale data
"""
tmp = self._db().get(LOCALE_DATA)
if tmp:
return json.loads(tmp.decode("utf8"))
return None
def collab_raw_item_matrix(self):
"""
Get the taar collaborative item matrix
"""
tmp = self._db().get(COLLAB_ITEM_MATRIX)
if tmp:
return json.loads(tmp.decode("utf8"))
return None
def collab_addon_mapping(self):
"""
Get the taar collaborative addon mappin
"""
tmp = self._db().get(COLLAB_MAPPING_DATA)
if tmp:
return json.loads(tmp.decode("utf8"))
return None
def similarity_donors(self):
"""
Get the taar similarity donors
"""
tmp = self._db().get(SIMILARITY_DONORS)
if tmp:
return json.loads(tmp.decode("utf8"))
return None
def similarity_lrcurves(self):
"""
Get the taar similarity donors
"""
tmp = self._db().get(SIMILARITY_LRCURVES)
if tmp:
return json.loads(tmp.decode("utf8"))
return None
def similarity_continuous_features(self):
"""
precomputed similarity recommender continuous features cache
"""
def ensure_db_loaded(self):
_ = self._db() # make sure we've computed data from the live redis instance
return self._similarity_continuous_features
def similarity_categorical_features(self):
"""
precomputed similarity recommender categorical features cache
"""
_ = self._db() # make sure we've computed data from the live redis instance
return self._similarity_categorical_features
@property
def similarity_num_donors(self):
"""
precomputed similarity recommender categorical features cache
"""
_ = self._db() # make sure we've computed data from the live redis instance
return self._similarity_num_donors
def ensemble_weights(self):
tmp = self._db().get(ENSEMBLE_WEIGHTS)
if tmp:
return json.loads(tmp)
return None
def whitelist_data(self):
tmp = self._db().get(WHITELIST_DATA)
if tmp:
return json.loads(tmp)
return None
def cache_context(self):
self._db()
return self._cache_context
def _build_cache_context(self):
"""
Fetch from redis once per request
"""
tmp = {
# Similarity stuff
"lr_curves": self.similarity_lrcurves(),
"num_donors": self.similarity_num_donors,
"continuous_features": self.similarity_continuous_features(),
"categorical_features": self.similarity_categorical_features(),
"donors_pool": self.similarity_donors(),
# Collaborative
"addon_mapping": self.collab_addon_mapping(),
"raw_item_matrix": self.collab_raw_item_matrix(),
# Locale
"top_addons_per_locale": self.top_addons_per_locale(),
# Ensemble
"whitelist": self.whitelist_data(),
"ensemble_weights": self.ensemble_weights(),
}
def compute_collab_model(val):
if val not in (None, ""):
num_rows = len(val)
num_cols = len(val[0]["features"])
model = np.zeros(shape=(num_rows, num_cols))
for index, row in enumerate(val):
model[index, :] = row["features"]
else:
model = None
return model
tmp["collab_model"] = compute_collab_model(tmp["raw_item_matrix"])
return tmp
"""
################################
Private methods below
"""
def _db(self):
"""
@ -447,6 +174,7 @@ class TAARCache:
live_db = self._r2
self._update_data_callback(db, live_db)
return live_db
def _update_data_callback(self, db_num, db):
@ -458,216 +186,15 @@ class TAARCache:
return
self._last_db = db_num
self._build_similarity_features_caches(db)
self._cache_context = self._build_cache_context()
self._build_cache_context(db)
self.logger.info("Completed precomputing normalized data")
def _build_similarity_features_caches(self, db):
"""
This function build two feature cache matrices and sets the
number of donors (self.similarity_num_donors)
That's the self.categorical_features and
self.continuous_features attributes.
One matrix is for the continuous features and the other is for
the categorical features. This is needed to speed up the similarity
recommendation process."""
from taar.recommenders.similarity_recommender import (
CONTINUOUS_FEATURES,
CATEGORICAL_FEATURES,
)
tmp = db.get(SIMILARITY_DONORS)
if tmp is None:
return
donors_pool = json.loads(tmp.decode("utf8"))
self._similarity_num_donors = len(donors_pool)
# Build a numpy matrix cache for the continuous features.
continuous_features = np.zeros(
(self.similarity_num_donors, len(CONTINUOUS_FEATURES))
)
for idx, d in enumerate(donors_pool):
features = [d.get(specified_key) for specified_key in CONTINUOUS_FEATURES]
continuous_features[idx] = features
self._similarity_continuous_features = continuous_features
# Build the cache for categorical features.
categorical_features = np.zeros(
(self.similarity_num_donors, len(CATEGORICAL_FEATURES)), dtype="object",
)
for idx, d in enumerate(donors_pool):
features = [d.get(specified_key) for specified_key in CATEGORICAL_FEATURES]
categorical_features[idx] = np.array([features], dtype="object")
self._similarity_categorical_features = categorical_features
self.logger.info("Reconstructed matrices for similarity recommender")
@property
def _ident(self):
""" pid/thread identity """
return f"{os.getpid()}_{threading.get_ident()}"
def _fetch_coinstall_data(self):
return s3_json_loader(
TAARLITE_GUID_COINSTALL_BUCKET, TAARLITE_GUID_COINSTALL_KEY
)
def _fetch_ranking_data(self):
return s3_json_loader(TAARLITE_GUID_COINSTALL_BUCKET, TAARLITE_GUID_RANKING_KEY)
def _fetch_locale_data(self):
return s3_json_loader(TAAR_LOCALE_BUCKET, TAAR_LOCALE_KEY)
def _fetch_collaborative_mapping_data(self):
return s3_json_loader(TAAR_ADDON_MAPPING_BUCKET, TAAR_ADDON_MAPPING_KEY)
def _fetch_collaborative_item_matrix(self):
return s3_json_loader(TAAR_ITEM_MATRIX_BUCKET, TAAR_ITEM_MATRIX_KEY)
def _fetch_similarity_donors(self):
return s3_json_loader(TAAR_SIMILARITY_BUCKET, TAAR_SIMILARITY_DONOR_KEY,)
def _fetch_similarity_lrcurves(self):
return s3_json_loader(TAAR_SIMILARITY_BUCKET, TAAR_SIMILARITY_LRCURVES_KEY,)
def _fetch_ensemble_weights(self):
return s3_json_loader(TAAR_ENSEMBLE_BUCKET, TAAR_ENSEMBLE_KEY)
def _fetch_whitelist(self):
return s3_json_loader(TAAR_WHITELIST_BUCKET, TAAR_WHITELIST_KEY)
def _update_whitelist_data(self, db):
"""
Load the TAAR whitelist data
"""
tmp = self._fetch_whitelist()
if tmp:
db.set(WHITELIST_DATA, json.dumps(tmp))
def _update_ensemble_data(self, db):
"""
Load the TAAR ensemble data
"""
tmp = self._fetch_ensemble_weights()
if tmp:
db.set(ENSEMBLE_WEIGHTS, json.dumps(tmp["ensemble_weights"]))
def _update_similarity_data(self, db):
"""
Load the TAAR similarity data
"""
donors = self._fetch_similarity_donors()
lrcurves = self._fetch_similarity_lrcurves()
db.set(SIMILARITY_DONORS, json.dumps(donors))
db.set(SIMILARITY_LRCURVES, json.dumps(lrcurves))
def _update_collab_data(self, db):
"""
Load the TAAR collaborative data. This is two parts: an item
matrix and a mapping of GUIDs
"""
# Load the item matrix into redis
item_matrix = self._fetch_collaborative_item_matrix()
db.set(COLLAB_ITEM_MATRIX, json.dumps(item_matrix))
# Load the taar collaborative mapping data
mapping_data = self._fetch_collaborative_mapping_data()
db.set(COLLAB_MAPPING_DATA, json.dumps(mapping_data))
def _update_locale_data(self, db):
"""
Load the TAAR locale data
"""
data = self._fetch_locale_data()
result = {}
for locale, guid_list in data.items():
result[locale] = sorted(guid_list, key=lambda x: x[1], reverse=True)
db.set(LOCALE_DATA, json.dumps(result))
def _update_coinstall_data(self, db):
"""
Load the TAAR Lite GUID GUID coinstallation data
"""
data = self._fetch_coinstall_data()
items = data.items()
len_items = len(items)
guid_count_map = {}
row_count = {}
guid_row_norm = {}
for i, (guid, coinstalls) in enumerate(items):
tmp = dict(
[(k, v) for (k, v) in coinstalls.items() if v >= self.min_installs(db)]
)
db.set(FILTERED_COINSTALL_PREFIX + guid, json.dumps(tmp))
rowsum = sum(coinstalls.values())
for coinstall_guid, coinstall_count in coinstalls.items():
# Capture the total number of time a GUID was
# coinstalled with other guids
guid_count_map.setdefault(coinstall_guid, 0)
guid_count_map[coinstall_guid] += coinstall_count
# Capture the unique number of times a GUID is
# coinstalled with other guids
row_count.setdefault(coinstall_guid, 0)
row_count[coinstall_guid] += 1
if coinstall_guid not in guid_row_norm:
guid_row_norm[coinstall_guid] = []
guid_row_norm[coinstall_guid].append(1.0 * coinstall_count / rowsum)
db.set(COINSTALL_PREFIX + guid, json.dumps(coinstalls))
if i % 1000 == 0:
self.logger.info(
f"Loaded {i+1} of {len_items} GUID-GUID coinstall records into redis"
)
self.logger.info("guidmaps computed - saving to redis")
for guid, guid_count in guid_count_map.items():
db.set(NORMDATA_COUNT_MAP_PREFIX + guid, json.dumps(guid_count))
for coinstall_guid, coinstall_count in row_count.items():
db.set(
NORMDATA_ROWCOUNT_PREFIX + coinstall_guid, json.dumps(coinstall_count),
)
for coinstall_guid, norm_val in guid_row_norm.items():
db.set(
NORMDATA_GUID_ROW_NORM_PREFIX + coinstall_guid, json.dumps(norm_val),
)
self.logger.info("finished saving guidmaps to redis")
def _update_rank_data(self, db):
data = self._fetch_ranking_data()
items = data.items()
len_items = len(items)
for i, (guid, count) in enumerate(items):
db.set(RANKING_PREFIX + guid, json.dumps(count))
if i % 1000 == 0:
self.logger.info(f"Loaded {i+1} of {len_items} GUID ranking into redis")
min_installs = np.mean(list(data.values())) * 0.05
db.set(MIN_INSTALLS_PREFIX, min_installs)
self.logger.info(f"Updated MIN_INSTALLS: {min_installs}")
def _load_data(self):
active_db = self._r0.get(ACTIVE_DB)
if active_db is not None:
@ -679,12 +206,6 @@ class TAARCache:
else:
next_active_db = 1
self._copy_data(next_active_db)
self._r0.set(ACTIVE_DB, next_active_db)
self.logger.info(f"Active DB is set to {next_active_db}")
def _copy_data(self, next_active_db):
if next_active_db == 1:
db = self._r1
else:
@ -693,21 +214,7 @@ class TAARCache:
# Clear this database before we do anything with it
db.flushdb()
# Update TAARlite
self._update_rank_data(db)
self._update_coinstall_data(db)
self._copy_data(db)
# Update TAAR locale data
self._update_locale_data(db)
# Update TAAR collaborative data
self._update_collab_data(db)
# Update TAAR similarity data
self._update_similarity_data(db)
# Update TAAR ensemble data
self._update_ensemble_data(db)
# Update TAAR ensemble data
self._update_whitelist_data(db)
self._r0.set(ACTIVE_DB, next_active_db)
self.logger.info(f"Active DB is set to {next_active_db}")

Просмотреть файл

@ -2,16 +2,11 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from .base_recommender import AbstractRecommender
from taar.recommenders.base_recommender import AbstractRecommender
from itertools import groupby
from scipy.spatial import distance
from taar.logs import IMozLogging
from taar.interfaces import IMozLogging, ITAARCache
import numpy as np
from taar.recommenders.redis_cache import TAARCache
import markus
metrics = markus.get_metrics("taar")
FLOOR_DISTANCE_ADJUSTMENT = 0.001
@ -46,34 +41,14 @@ class SimilarityRecommender(AbstractRecommender):
def __init__(self, ctx):
self._ctx = ctx
self._redis_cache = TAARCache.get_instance(self._ctx)
self._cache = self._ctx[ITAARCache]
self.logger = self._ctx[IMozLogging].get_logger("taar")
@property
def categorical_features(self):
return self._redis_cache.similarity_categorical_features()
@property
def continuous_features(self):
return self._redis_cache.similarity_continuous_features()
@property
def num_donors(self):
return self._redis_cache.similarity_num_donors
@property
def donors_pool(self):
return self._redis_cache.similarity_donors()
@property
def lr_curves(self):
return self._redis_cache.similarity_lrcurves()
def _get_cache(self, extra_data):
tmp = extra_data.get("cache", None)
if tmp is None:
tmp = self._redis_cache.cache_context()
tmp = self._cache.cache_context()
return tmp
"""
@ -234,19 +209,6 @@ class SimilarityRecommender(AbstractRecommender):
)
return recommendations_out
@metrics.timer_decorator("similarity_recommend")
def recommend(self, client_data, limit, extra_data={}):
try:
recommendations_out = self._recommend(client_data, limit, extra_data)
except Exception as e:
recommendations_out = []
metrics.incr("error_similarity", value=1)
self.logger.exception(
"Similarity recommender crashed for {}".format(
client_data.get("client_id", "no-client-id")
),
e,
)
recommendations_out = self._recommend(client_data, limit, extra_data)
return recommendations_out[:limit]

Просмотреть файл

@ -5,79 +5,82 @@
from decouple import config
PYTHON_LOG_LEVEL = config("PYTHON_LOG_LEVEL", "INFO")
REDIS_HOST = config("REDIS_HOST", "localhost", cast=str)
REDIS_PORT = config("REDIS_PORT", 6379, cast=int)
class AppSettings:
PYTHON_LOG_LEVEL = config("PYTHON_LOG_LEVEL", "INFO")
STATSD_HOST = config("STATSD_HOST", default="localhost", cast=str)
STATSD_PORT = config("STATSD_PORT", default=8125, cast=int)
# These are configurations that are specific to the TAAR library
TAAR_MAX_RESULTS = config("TAAR_MAX_RESULTS", default=10, cast=int)
DISABLE_REDIS = config("DISABLE_REDIS", False, cast=bool)
TAARLITE_MAX_RESULTS = config("TAARLITE_MAX_RESULTS", default=4, cast=int)
TAAR_MAX_RESULTS = config("TAAR_MAX_RESULTS", default=10, cast=int)
TAARLITE_MAX_RESULTS = config("TAARLITE_MAX_RESULTS", default=4, cast=int)
STATSD_HOST = config("STATSD_HOST", default="localhost", cast=str)
STATSD_PORT = config("STATSD_PORT", default=8125, cast=int)
TAAR_ENSEMBLE_BUCKET = config("TAAR_ENSEMBLE_BUCKET", default="test_ensemble_bucket")
TAAR_ENSEMBLE_KEY = config("TAAR_ENSEMBLE_KEY", default="test_ensemble_key")
TAAR_WHITELIST_BUCKET = config("TAAR_WHITELIST_BUCKET", default="test_whitelist_bucket")
TAAR_WHITELIST_KEY = config("TAAR_WHITELIST_KEY", default="test_whitelist_key")
TAAR_ITEM_MATRIX_BUCKET = config(
"TAAR_ITEM_MATRIX_BUCKET", default="test_matrix_bucket"
)
TAAR_ITEM_MATRIX_KEY = config("TAAR_ITEM_MATRIX_KEY", default="test_matrix_key")
TAAR_ADDON_MAPPING_BUCKET = config(
"TAAR_ADDON_MAPPING_BUCKET", default="test_mapping_bucket"
)
TAAR_ADDON_MAPPING_KEY = config("TAAR_ADDON_MAPPING_KEY", default="test_mapping_key")
TAAR_LOCALE_BUCKET = config("TAAR_LOCALE_BUCKET", default="test_locale_bucket")
TAAR_LOCALE_KEY = config("TAAR_LOCALE_KEY", default="test_locale_key")
# Bigtable config
BIGTABLE_PROJECT_ID = config(
"BIGTABLE_PROJECT_ID", default="cfr-personalization-experiment"
)
BIGTABLE_INSTANCE_ID = config("BIGTABLE_INSTANCE_ID", default="taar-profile")
BIGTABLE_TABLE_ID = config("BIGTABLE_TABLE_ID", default="taar_profile")
TAAR_SIMILARITY_BUCKET = config(
"TAAR_SIMILARITY_BUCKET", default="test_similarity_bucket"
)
TAAR_SIMILARITY_DONOR_KEY = config(
"TAAR_SIMILARITY_DONOR_KEY", default="test_similarity_donor_key"
)
TAAR_SIMILARITY_LRCURVES_KEY = config(
"TAAR_SIMILARITY_LRCURVES_KEY", default="test_similarity_lrcurves_key"
)
class DefaultCacheSettings:
DISABLE_TAAR_LITE = config("DISABLE_TAAR_LITE", False, cast=bool)
DISABLE_ENSEMBLE = config("DISABLE_ENSEMBLE", False, cast=bool)
TAAR_ENSEMBLE_BUCKET = config("TAAR_ENSEMBLE_BUCKET", default="test_ensemble_bucket")
TAAR_ENSEMBLE_KEY = config("TAAR_ENSEMBLE_KEY", default="test_ensemble_key")
TAAR_WHITELIST_BUCKET = config("TAAR_WHITELIST_BUCKET", default="test_whitelist_bucket")
TAAR_WHITELIST_KEY = config("TAAR_WHITELIST_KEY", default="test_whitelist_key")
TAAR_ITEM_MATRIX_BUCKET = config("TAAR_ITEM_MATRIX_BUCKET", default="test_matrix_bucket")
TAAR_ITEM_MATRIX_KEY = config("TAAR_ITEM_MATRIX_KEY", default="test_matrix_key")
TAAR_ADDON_MAPPING_BUCKET = config("TAAR_ADDON_MAPPING_BUCKET", default="test_mapping_bucket")
TAAR_ADDON_MAPPING_KEY = config("TAAR_ADDON_MAPPING_KEY", default="test_mapping_key")
TAAR_LOCALE_BUCKET = config("TAAR_LOCALE_BUCKET", default="test_locale_bucket")
TAAR_LOCALE_KEY = config("TAAR_LOCALE_KEY", default="test_locale_key")
TAAR_SIMILARITY_BUCKET = config("TAAR_SIMILARITY_BUCKET", default="test_similarity_bucket")
TAAR_SIMILARITY_DONOR_KEY = config("TAAR_SIMILARITY_DONOR_KEY", default="test_similarity_donor_key")
TAAR_SIMILARITY_LRCURVES_KEY = config("TAAR_SIMILARITY_LRCURVES_KEY", default="test_similarity_lrcurves_key")
# TAAR-lite configuration below
TAARLITE_GUID_COINSTALL_BUCKET = config("TAARLITE_GUID_COINSTALL_BUCKET", "telemetry-parquet")
TAARLITE_GUID_COINSTALL_KEY = config("TAARLITE_GUID_COINSTALL_KEY", "taar/lite/guid_coinstallation.json")
TAARLITE_GUID_RANKING_KEY = config("TAARLITE_GUID_RANKING_KEY", "taar/lite/guid_install_ranking.json")
TAARLITE_TRUNCATE = config("TAARLITE_TRUNCATE", AppSettings.TAARLITE_MAX_RESULTS * 5, cast=int)
# TAAR-lite configuration below
class RedisCacheSettings(DefaultCacheSettings):
# 4 hour liviliness for TAARLITE data
TAARLITE_TTL = config("TAARLITE_TTL", 60 * 60 * 4, cast=int)
TAARLITE_GUID_COINSTALL_BUCKET = config(
"TAARLITE_GUID_COINSTALL_BUCKET", "telemetry-parquet"
)
TAARLITE_GUID_COINSTALL_KEY = config(
"TAARlLITE_GUID_COINSTALL_KEY", "taar/lite/guid_coinstallation.json"
)
# TAARlite needs redis backed mutex's to protect critical sections
# Set a default TAARLite mutex TTL of 1 hour to fully populate the
# redis cache
TAARLITE_MUTEX_TTL = config("TAARLITE_MUTEX_TTL", 60 * 60, cast=int)
TAARLITE_GUID_RANKING_KEY = config(
"TAARLITE_GUID_RANKING_KEY", "taar/lite/guid_install_ranking.json"
)
# 4 hour liviliness for TAARLITE data
TAARLITE_TTL = config("TAARLITE_TTL", 60 * 60 * 4, cast=int)
REDIS_HOST = config("REDIS_HOST", "localhost", cast=str)
REDIS_PORT = config("REDIS_PORT", 6379, cast=int)
# TAARlite needs redis backed mutex's to protect critical sections
TAARLITE_MUTEX_TTL = config("TAARLITE_MUTEX_TTL", 60 * 60, cast=int)
class PackageCacheSettings(DefaultCacheSettings):
TAAR_LOCALE_BUCKET = 'moz-fx-data-taar-pr-prod-e0f7-prod-models'
TAAR_LOCALE_KEY = 'taar/locale/top10_dict.json.bz2'
TAAR_SIMILARITY_BUCKET = 'moz-fx-data-taar-pr-prod-e0f7-prod-models'
TAAR_SIMILARITY_DONOR_KEY = 'taar/similarity/donors.json.bz2'
TAAR_SIMILARITY_LRCURVES_KEY = 'taar/similarity/lr_curves.json.bz2'
# Set a default TAARLite mutex TTL of 1 hour to fully populate the
# redis cache
TAARLITE_MUTEX_TTL = config("TAARLITE_MUTEX_TTL", 60 * 60, cast=int)
TAAR_ITEM_MATRIX_BUCKET = 'moz-fx-data-taar-pr-prod-e0f7-prod-models'
TAAR_ITEM_MATRIX_KEY = 'addon_recommender/item_matrix.json.bz2'
TAAR_ADDON_MAPPING_BUCKET = 'moz-fx-data-taar-pr-prod-e0f7-prod-models'
TAAR_ADDON_MAPPING_KEY = 'addon_recommender/addon_mapping.json.bz2'
TAARLITE_TRUNCATE = config("TAARLITE_TRUNCATE", TAARLITE_MAX_RESULTS * 5, cast=int)
# Bigtable config
BIGTABLE_PROJECT_ID = config(
"BIGTABLE_PROJECT_ID", default="cfr-personalization-experiment"
)
BIGTABLE_INSTANCE_ID = config("BIGTABLE_INSTANCE_ID", default="taar-profile")
BIGTABLE_TABLE_ID = config("BIGTABLE_TABLE_ID", default="taar_profile")
DISABLE_TAAR_LITE = True
DISABLE_ENSEMBLE = True

Просмотреть файл

@ -3,7 +3,7 @@ These are global fixtures automagically loaded by pytest
"""
import pytest
from taar.context import _default_context
from taar.context import app_context
FAKE_LOCALE_DATA = {
"te-ST": [
@ -16,9 +16,9 @@ FAKE_LOCALE_DATA = {
}
@pytest.fixture
@pytest.fixture(scope='function')
def test_ctx():
ctx = _default_context()
ctx = app_context()
return ctx

Просмотреть файл

@ -4,17 +4,17 @@ Noop helpers
"""
import mock
from taar.recommenders.redis_cache import TAARCache
from taar.recommenders.redis_cache import TAARCacheRedis
def noop_taarlite_dataload(stack):
# no-op the taarlite rankdata
stack.enter_context(
mock.patch.object(TAARCache, "_update_rank_data", return_value=None)
mock.patch.object(TAARCacheRedis, "_update_rank_data", return_value=None)
)
# no-op the taarlite guidguid data
stack.enter_context(
mock.patch.object(TAARCache, "_update_coinstall_data", return_value=None,)
mock.patch.object(TAARCacheRedis, "_update_coinstall_data", return_value=None, )
)
return stack
@ -22,7 +22,7 @@ def noop_taarlite_dataload(stack):
def noop_taarlocale_dataload(stack):
# no-op the taarlite rankdata
stack.enter_context(
mock.patch.object(TAARCache, "_update_locale_data", return_value=None)
mock.patch.object(TAARCacheRedis, "_update_locale_data", return_value=None)
)
return stack
@ -30,7 +30,7 @@ def noop_taarlocale_dataload(stack):
def noop_taarcollab_dataload(stack):
# no-op the taar collab
stack.enter_context(
mock.patch.object(TAARCache, "_update_collab_data", return_value=None)
mock.patch.object(TAARCacheRedis, "_update_collab_data", return_value=None)
)
return stack
@ -38,7 +38,7 @@ def noop_taarcollab_dataload(stack):
def noop_taarsimilarity_dataload(stack):
# no-op the taar collab
stack.enter_context(
mock.patch.object(TAARCache, "_update_similarity_data", return_value=None)
mock.patch.object(TAARCacheRedis, "_update_similarity_data", return_value=None)
)
return stack
@ -46,9 +46,9 @@ def noop_taarsimilarity_dataload(stack):
def noop_taarensemble_dataload(stack):
# no-op the taar collab
stack.enter_context(
mock.patch.object(TAARCache, "_update_ensemble_data", return_value=None)
mock.patch.object(TAARCacheRedis, "_update_ensemble_data", return_value=None)
)
stack.enter_context(
mock.patch.object(TAARCache, "_update_whitelist_data", return_value=None)
mock.patch.object(TAARCacheRedis, "_update_whitelist_data", return_value=None)
)
return stack

Просмотреть файл

@ -6,20 +6,16 @@
Test cases for the TAAR CollaborativeRecommender
"""
import numpy
import contextlib
import fakeredis
import mock
import contextlib
from taar.recommenders.redis_cache import TAARCache
import numpy
from taar.interfaces import ITAARCache
from taar.recommenders.collaborative_recommender import CollaborativeRecommender
from taar.recommenders.collaborative_recommender import positive_hash
from markus import TIMING
from markus.testing import MetricsMock
from taar.recommenders.redis_cache import TAARCacheRedis
from .noop_fixtures import (
noop_taarlocale_dataload,
noop_taarlite_dataload,
@ -27,7 +23,6 @@ from .noop_fixtures import (
noop_taarsimilarity_dataload,
)
"""
We need to generate a synthetic list of addons and relative weights
for co-occurance. It's important to note that the
@ -51,16 +46,16 @@ def mock_install_none_mock_data(ctx):
we always get 404 errors.
"""
with contextlib.ExitStack() as stack:
TAARCache._instance = None
TAARCacheRedis._instance = None
stack.enter_context(
mock.patch.object(
TAARCache, "_fetch_collaborative_item_matrix", return_value="",
TAARCacheRedis, "_fetch_collaborative_item_matrix", return_value="",
)
)
stack.enter_context(
mock.patch.object(
TAARCache, "_fetch_collaborative_mapping_data", return_value="",
TAARCacheRedis, "_fetch_collaborative_mapping_data", return_value="",
)
)
@ -69,7 +64,7 @@ def mock_install_none_mock_data(ctx):
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
@ -80,7 +75,9 @@ def mock_install_none_mock_data(ctx):
)
# Initialize redis
TAARCache.get_instance(ctx).safe_load_data()
cache = TAARCacheRedis.get_instance(ctx)
cache.safe_load_data()
ctx[ITAARCache] = cache
yield stack
@ -109,17 +106,17 @@ def mock_install_mock_data(ctx):
fake_mapping[str(java_hash)] = addon
with contextlib.ExitStack() as stack:
TAARCache._instance = None
TAARCacheRedis._instance = None
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"_fetch_collaborative_item_matrix",
return_value=fake_addon_matrix,
)
)
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"_fetch_collaborative_mapping_data",
return_value=fake_mapping,
)
@ -130,7 +127,7 @@ def mock_install_mock_data(ctx):
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
@ -141,7 +138,9 @@ def mock_install_mock_data(ctx):
)
# Initialize redis
TAARCache.get_instance(ctx).safe_load_data()
cache = TAARCacheRedis.get_instance(ctx)
cache.safe_load_data()
ctx[ITAARCache] = cache
yield stack
@ -189,34 +188,30 @@ def test_empty_recommendations(test_ctx):
def test_best_recommendation(test_ctx):
with MetricsMock() as mm:
# Make sure the structure of the recommendations is correct and that we
# recommended the the right addon.
with mock_install_mock_data(test_ctx):
r = CollaborativeRecommender(test_ctx)
# Make sure the structure of the recommendations is correct and that we
# recommended the the right addon.
with mock_install_mock_data(test_ctx):
r = CollaborativeRecommender(test_ctx)
# An non-empty set of addons should give a list of recommendations
fixture_client_data = {
"installed_addons": ["addon4.id"],
"client_id": "test_client",
}
assert r.can_recommend(fixture_client_data)
recommendations = r.recommend(fixture_client_data, 1)
# An non-empty set of addons should give a list of recommendations
fixture_client_data = {
"installed_addons": ["addon4.id"],
"client_id": "test_client",
}
assert r.can_recommend(fixture_client_data)
recommendations = r.recommend(fixture_client_data, 1)
assert isinstance(recommendations, list)
assert len(recommendations) == 1
assert isinstance(recommendations, list)
assert len(recommendations) == 1
# Verify that addon2 - the most heavy weighted addon was
# recommended
result = recommendations[0]
assert type(result) is tuple
assert len(result) == 2
assert result[0] == "addon2.id"
assert type(result[1]) is numpy.float64
assert numpy.isclose(result[1], numpy.float64("0.3225"))
assert mm.has_record(TIMING, stat="taar.collaborative_recommend")
# Verify that addon2 - the most heavy weighted addon was
# recommended
result = recommendations[0]
assert type(result) is tuple
assert len(result) == 2
assert result[0] == "addon2.id"
assert type(result[1]) is numpy.float64
assert numpy.isclose(result[1], numpy.float64("0.3225"))
def test_recommendation_weights(test_ctx):

Просмотреть файл

@ -1,12 +1,12 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from taar.interfaces import ITAARCache
from taar.recommenders.ensemble_recommender import EnsembleRecommender
import mock
import contextlib
import fakeredis
from taar.recommenders.redis_cache import TAARCache
from taar.recommenders.redis_cache import TAARCacheRedis
from .noop_fixtures import (
noop_taarlocale_dataload,
noop_taarcollab_dataload,
@ -59,14 +59,14 @@ def mock_install_mock_ensemble_data(ctx):
]
with contextlib.ExitStack() as stack:
TAARCache._instance = None
TAARCacheRedis._instance = None
stack.enter_context(
mock.patch.object(TAARCache, "_fetch_ensemble_weights", return_value=DATA,)
mock.patch.object(TAARCacheRedis, "_fetch_ensemble_weights", return_value=DATA, )
)
stack.enter_context(
mock.patch.object(
TAARCache, "_fetch_whitelist", return_value=WHITELIST_DATA,
TAARCacheRedis, "_fetch_whitelist", return_value=WHITELIST_DATA,
)
)
@ -75,7 +75,7 @@ def mock_install_mock_ensemble_data(ctx):
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
@ -86,7 +86,9 @@ def mock_install_mock_ensemble_data(ctx):
)
# Initialize redis
TAARCache.get_instance(ctx).safe_load_data()
cache = TAARCacheRedis.get_instance(ctx)
cache.safe_load_data()
ctx[ITAARCache] = cache
yield stack
@ -133,6 +135,9 @@ def test_recommendations(test_ctx):
assert recommendation_list == EXPECTED_RESULTS
assert mm.has_record(TIMING, "taar.ensemble_recommend")
assert mm.has_record(TIMING, "taar.collaborative_recommend")
assert mm.has_record(TIMING, "taar.locale_recommend")
assert mm.has_record(TIMING, "taar.similarity_recommend")
def test_preinstalled_guids(test_ctx):

Просмотреть файл

@ -5,6 +5,7 @@ import pytest
import mock
import contextlib
from taar.interfaces import ITAARCache
from .noop_fixtures import (
noop_taarlocale_dataload,
noop_taarcollab_dataload,
@ -13,10 +14,9 @@ from .noop_fixtures import (
)
from taar.recommenders.guid_based_recommender import GuidBasedRecommender
from taar.recommenders.redis_cache import TAARCache
from taar.recommenders.redis_cache import NORMDATA_GUID_ROW_NORM_PREFIX
from taar.recommenders.redis_cache import TAARCacheRedis
from taar.recommenders.cache import NORMDATA_GUID_ROW_NORM_PREFIX
from taar.recommenders.ua_parser import parse_ua, OSNAME_TO_ID
@ -92,16 +92,16 @@ RESULTS = {
def mock_coinstall_ranking_context(ctx, mock_coinstall, mock_ranking):
with contextlib.ExitStack() as stack:
TAARCache._instance = None
TAARCacheRedis._instance = None
stack.enter_context(
mock.patch.object(
TAARCache, "_fetch_ranking_data", return_value=mock_ranking,
TAARCacheRedis, "_fetch_ranking_data", return_value=mock_ranking,
)
)
stack.enter_context(
mock.patch.object(
TAARCache, "_fetch_coinstall_data", return_value=mock_coinstall,
TAARCacheRedis, "_fetch_coinstall_data", return_value=mock_coinstall,
)
)
@ -113,7 +113,7 @@ def mock_coinstall_ranking_context(ctx, mock_coinstall, mock_ranking):
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
@ -124,7 +124,9 @@ def mock_coinstall_ranking_context(ctx, mock_coinstall, mock_ranking):
)
# Initialize redis
TAARCache.get_instance(ctx).safe_load_data()
cache = TAARCacheRedis.get_instance(ctx)
cache.safe_load_data()
ctx[ITAARCache] = cache
yield stack
@ -267,7 +269,7 @@ def test_missing_rownorm_data_issue_31(
EXPECTED_RESULTS = RESULTS["rownorm_sum_tiebreak"]
# Explicitly destroy the guid-4 key in the row_norm data
recommender._redis_cache._db().delete(NORMDATA_GUID_ROW_NORM_PREFIX + "guid-4")
recommender._cache._db().delete(NORMDATA_GUID_ROW_NORM_PREFIX + "guid-4")
for i, row in enumerate(EXPECTED_RESULTS):
if row[0] == "guid-4":
del EXPECTED_RESULTS[i]
@ -293,7 +295,7 @@ def test_divide_by_zero_rownorm_data_issue_31(
# Explicitly set the guid-4 key in the row_norm data to have a sum
# of zero weights
recommender._redis_cache._db().set(
recommender._cache._db().set(
NORMDATA_GUID_ROW_NORM_PREFIX + "guid-4", json.dumps([0, 0, 0])
)

Просмотреть файл

@ -7,8 +7,8 @@ from flask import url_for
import pytest
from taar.settings import TAARLITE_MAX_RESULTS
from taar.context import default_context
from taar.settings import AppSettings
from taar.context import app_context
from .test_guid_based_recommender import mock_coinstall_ranking_context
try:
@ -23,7 +23,6 @@ def hasher(uuid):
@pytest.fixture
def app():
from taar.plugin import configure_plugin
from taar.plugin import PROXY_MANAGER
@ -99,7 +98,7 @@ class PlatformRecommendationManager(FakeRecommendationManager):
class ProfileFetcherEnabledRecommendationManager(FakeRecommendationManager):
def __init__(self, *args, **kwargs):
self._ctx = default_context()
self._ctx = app_context()
self._ctx["profile_fetcher"] = kwargs["profile_fetcher"]
super(ProfileFetcherEnabledRecommendationManager, self).__init__(args, kwargs)
@ -199,10 +198,7 @@ def test_locale_recommendation(client, locale_recommendation_manager):
def test_platform_recommendation(client, platform_recommendation_manager):
uri = (
url_for("recommendations", hashed_client_id=hasher(uuid.uuid4()))
+ "?platform=WOW64"
)
uri = url_for("recommendations", hashed_client_id=hasher(uuid.uuid4())) + "?platform=WOW64"
response = client.post(uri)
assert response.status_code == 200
assert response.headers["Content-Type"] == "application/json"
@ -253,7 +249,7 @@ def test_mixed_and_promoted_and_taar_adodns(client, static_recommendation_manage
def test_overlapping_mixed_and_promoted_and_taar_adodns(
client, static_recommendation_manager
client, static_recommendation_manager
):
"""
Test that we can provide addon suggestions that also get clobbered
@ -265,7 +261,7 @@ def test_overlapping_mixed_and_promoted_and_taar_adodns(
json=dict(
{
"options": {
"promoted": [["test-addon-1", 10], ["guid2", 5], ["guid55", 8],]
"promoted": [["test-addon-1", 10], ["guid2", 5], ["guid55", 8], ]
}
}
),
@ -273,7 +269,7 @@ def test_overlapping_mixed_and_promoted_and_taar_adodns(
)
# The result should order the GUIDs in descending order of weight
expected = {
"results": ["test-addon-1", "guid55", "guid2", "test-addon-2", "test-addon-N",]
"results": ["test-addon-1", "guid55", "guid2", "test-addon-2", "test-addon-N", ]
}
assert res.json == expected
@ -333,11 +329,10 @@ def test_taarlite(client, test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKI
"""
with mock_coinstall_ranking_context(
test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
test_ctx, TAARLITE_MOCK_DATA, TAARLITE_MOCK_GUID_RANKING
):
url = url_for("taarlite_recommendations", guid="guid-1",)
url = url_for("taarlite_recommendations", guid="guid-1", )
res = client.get(url, follow_redirects=True)
assert len(res.json["results"]) == TAARLITE_MAX_RESULTS
assert len(res.json["results"]) == AppSettings.TAARLITE_MAX_RESULTS
assert res.json["results"] == ["guid-5", "guid-6", "guid-3", "guid-2"]

Просмотреть файл

@ -2,27 +2,24 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import boto3
import mock
import bz2
import contextlib
import json
import fakeredis
from taar.recommenders.redis_cache import TAARCache
import mock
from google.cloud import storage
from taar.interfaces import ITAARCache
from taar.recommenders.locale_recommender import LocaleRecommender
from taar.recommenders.redis_cache import TAARCacheRedis
from taar.settings import DefaultCacheSettings
from .noop_fixtures import (
noop_taarcollab_dataload,
noop_taarlite_dataload,
noop_taarsimilarity_dataload,
noop_taarensemble_dataload,
)
import json
from taar.recommenders import LocaleRecommender
from taar.settings import TAAR_LOCALE_KEY, TAAR_LOCALE_BUCKET
from markus import TIMING
from markus.testing import MetricsMock
FAKE_LOCALE_DATA = {
"te-ST": [
@ -37,12 +34,14 @@ FAKE_LOCALE_DATA = {
def install_mock_data(ctx):
ctx = ctx.child()
conn = boto3.resource("s3", region_name="us-west-2")
conn.create_bucket(Bucket=TAAR_LOCALE_BUCKET)
conn.Object(TAAR_LOCALE_BUCKET, TAAR_LOCALE_KEY).put(
Body=json.dumps(FAKE_LOCALE_DATA)
)
byte_data = json.dumps(FAKE_LOCALE_DATA).encode("utf8")
byte_data = bz2.compress(byte_data)
client = storage.Client()
bucket = client.get_bucket(DefaultCacheSettings.TAAR_LOCALE_BUCKET)
blob = bucket.blob(DefaultCacheSettings.TAAR_LOCALE_KEY)
blob.upload_from_string(byte_data)
return ctx
@ -50,10 +49,10 @@ def install_mock_data(ctx):
@contextlib.contextmanager
def mock_locale_data(ctx):
with contextlib.ExitStack() as stack:
TAARCache._instance = None
TAARCacheRedis._instance = None
stack.enter_context(
mock.patch.object(
TAARCache, "_fetch_locale_data", return_value=FAKE_LOCALE_DATA,
TAARCacheRedis, "_fetch_locale_data", return_value=FAKE_LOCALE_DATA,
)
)
@ -65,7 +64,7 @@ def mock_locale_data(ctx):
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
@ -76,7 +75,9 @@ def mock_locale_data(ctx):
)
# Initialize redis
TAARCache.get_instance(ctx).safe_load_data()
cache = TAARCacheRedis.get_instance(ctx)
cache.safe_load_data()
ctx[ITAARCache] = cache
yield stack
@ -110,25 +111,22 @@ def test_recommendations(test_ctx):
The JSON output for this recommender should be a list of 2-tuples
of (GUID, weight).
"""
with MetricsMock() as mm:
with mock_locale_data(test_ctx):
r = LocaleRecommender(test_ctx)
with mock_locale_data(test_ctx):
r = LocaleRecommender(test_ctx)
recommendations = r.recommend({"locale": "en"}, 10)
recommendations = r.recommend({"locale": "en"}, 10)
# Make sure the structure of the recommendations is correct and that we
# recommended the the right addon.
assert isinstance(recommendations, list)
assert len(recommendations) == len(FAKE_LOCALE_DATA["en"])
# Make sure the structure of the recommendations is correct and that we
# recommended the the right addon.
assert isinstance(recommendations, list)
assert len(recommendations) == len(FAKE_LOCALE_DATA["en"])
# Make sure that the reported addons are the one from the fake data.
for (addon_id, weight), (expected_id, expected_weight) in zip(
# Make sure that the reported addons are the one from the fake data.
for (addon_id, weight), (expected_id, expected_weight) in zip(
recommendations, FAKE_LOCALE_DATA["en"]
):
assert addon_id == expected_id
assert weight == expected_weight
assert mm.has_record(TIMING, "taar.locale_recommend")
):
assert addon_id == expected_id
assert weight == expected_weight
def test_recommender_extra_data(test_ctx):
@ -143,7 +141,7 @@ def test_recommender_extra_data(test_ctx):
# Make sure that the reported addons are the one from the fake data.
for (addon_id, weight), (expected_id, expected_weight) in zip(
data, FAKE_LOCALE_DATA[expected_locale]
data, FAKE_LOCALE_DATA[expected_locale]
):
assert addon_id == expected_id
assert weight == expected_weight

Просмотреть файл

@ -2,7 +2,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from taar import ProfileFetcher
from taar.profile_fetcher import ProfileFetcher
from taar.profile_fetcher import BigTableProfileController
from google.cloud import bigtable
import copy

Просмотреть файл

@ -2,7 +2,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from taar.recommenders import RecommendationManager
from taar.recommenders.recommendation_manager import RecommendationManager
from taar.recommenders.base_recommender import AbstractRecommender
from .noop_fixtures import (
@ -24,7 +24,7 @@ from markus.testing import MetricsMock
import mock
import contextlib
import fakeredis
from taar.recommenders.redis_cache import TAARCache
from taar.recommenders.redis_cache import TAARCacheRedis
@contextlib.contextmanager
@ -38,14 +38,14 @@ def mock_install_mock_curated_data(ctx):
}
with contextlib.ExitStack() as stack:
TAARCache._instance = None
TAARCacheRedis._instance = None
stack.enter_context(
mock.patch.object(TAARCache, "_fetch_whitelist", return_value=mock_data)
mock.patch.object(TAARCacheRedis, "_fetch_whitelist", return_value=mock_data)
)
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"_fetch_ensemble_weights",
return_value=mock_ensemble_weights,
)
@ -57,13 +57,13 @@ def mock_install_mock_curated_data(ctx):
stack = noop_taarsimilarity_dataload(stack)
stack.enter_context(
mock.patch.object(TAARCache, "_fetch_whitelist", return_value=mock_data)
mock.patch.object(TAARCacheRedis, "_fetch_whitelist", return_value=mock_data)
)
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
@ -83,7 +83,7 @@ def mock_install_mock_curated_data(ctx):
ctx["recommender_factory"] = MockRecommenderFactory()
# Initialize redis
TAARCache.get_instance(ctx).safe_load_data()
TAARCacheRedis.get_instance(ctx).safe_load_data()
yield stack

Просмотреть файл

@ -6,10 +6,10 @@ import json
import six
import logging
import numpy as np
import scipy.stats
from taar.interfaces import ITAARCache
from taar.recommenders.similarity_recommender import (
CATEGORICAL_FEATURES,
CONTINUOUS_FEATURES,
@ -19,9 +19,6 @@ from taar.recommenders.similarity_recommender import (
from .similarity_data import CONTINUOUS_FEATURE_FIXTURE_DATA
from .similarity_data import CATEGORICAL_FEATURE_FIXTURE_DATA
from markus import TIMING
from markus.testing import MetricsMock
import fakeredis
import mock
import contextlib
@ -31,7 +28,7 @@ from .noop_fixtures import (
noop_taarlocale_dataload,
noop_taarensemble_dataload,
)
from taar.recommenders.redis_cache import TAARCache
from taar.recommenders.redis_cache import TAARCacheRedis
def noop_loaders(stack):
@ -80,15 +77,14 @@ def generate_a_fake_taar_client():
@contextlib.contextmanager
def mock_install_no_data(ctx):
with contextlib.ExitStack() as stack:
TAARCache._instance = None
TAARCacheRedis._instance = None
stack.enter_context(
mock.patch.object(TAARCache, "_fetch_similarity_donors", return_value="",)
mock.patch.object(TAARCacheRedis, "_fetch_similarity_donors", return_value="", )
)
stack.enter_context(
mock.patch.object(TAARCache, "_fetch_similarity_lrcurves", return_value="",)
mock.patch.object(TAARCacheRedis, "_fetch_similarity_lrcurves", return_value="", )
)
stack = noop_loaders(stack)
@ -96,7 +92,7 @@ def mock_install_no_data(ctx):
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
@ -107,18 +103,19 @@ def mock_install_no_data(ctx):
)
# Initialize redis
TAARCache.get_instance(ctx).safe_load_data()
cache = TAARCacheRedis.get_instance(ctx)
cache.safe_load_data()
ctx[ITAARCache] = cache
yield stack
@contextlib.contextmanager
def mock_install_categorical_data(ctx):
with contextlib.ExitStack() as stack:
TAARCache._instance = None
TAARCacheRedis._instance = None
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"_fetch_similarity_donors",
return_value=CATEGORICAL_FEATURE_FIXTURE_DATA,
)
@ -126,7 +123,7 @@ def mock_install_categorical_data(ctx):
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"_fetch_similarity_lrcurves",
return_value=generate_fake_lr_curves(1000),
)
@ -136,7 +133,7 @@ def mock_install_categorical_data(ctx):
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
@ -147,7 +144,9 @@ def mock_install_categorical_data(ctx):
)
# Initialize redis
TAARCache.get_instance(ctx).safe_load_data()
cache = TAARCacheRedis.get_instance(ctx)
cache.safe_load_data()
ctx[ITAARCache] = cache
yield stack
@ -157,16 +156,16 @@ def mock_install_continuous_data(ctx):
lrs_data = generate_fake_lr_curves(1000)
with contextlib.ExitStack() as stack:
TAARCache._instance = None
TAARCacheRedis._instance = None
stack.enter_context(
mock.patch.object(
TAARCache, "_fetch_similarity_donors", return_value=cts_data,
TAARCacheRedis, "_fetch_similarity_donors", return_value=cts_data,
)
)
stack.enter_context(
mock.patch.object(
TAARCache, "_fetch_similarity_lrcurves", return_value=lrs_data,
TAARCacheRedis, "_fetch_similarity_lrcurves", return_value=lrs_data,
)
)
stack = noop_loaders(stack)
@ -174,7 +173,7 @@ def mock_install_continuous_data(ctx):
# Patch fakeredis in
stack.enter_context(
mock.patch.object(
TAARCache,
TAARCacheRedis,
"init_redis_connections",
return_value={
0: fakeredis.FakeStrictRedis(db=0),
@ -185,7 +184,9 @@ def mock_install_continuous_data(ctx):
)
# Initialize redis
TAARCache.get_instance(ctx).safe_load_data()
cache = TAARCacheRedis.get_instance(ctx)
cache.safe_load_data()
ctx[ITAARCache] = cache
yield stack
@ -227,23 +228,20 @@ def test_can_recommend(test_ctx, caplog):
def test_recommendations(test_ctx):
with MetricsMock() as mm:
# Create a new instance of a SimilarityRecommender.
with mock_install_continuous_data(test_ctx):
r = SimilarityRecommender(test_ctx)
# Create a new instance of a SimilarityRecommender.
with mock_install_continuous_data(test_ctx):
r = SimilarityRecommender(test_ctx)
recommendation_list = r.recommend(generate_a_fake_taar_client(), 1)
recommendation_list = r.recommend(generate_a_fake_taar_client(), 1)
assert isinstance(recommendation_list, list)
assert len(recommendation_list) == 1
assert isinstance(recommendation_list, list)
assert len(recommendation_list) == 1
recommendation, weight = recommendation_list[0]
recommendation, weight = recommendation_list[0]
# Make sure that the reported addons are the expected ones from the most similar donor.
assert "{test-guid-1}" == recommendation
assert type(weight) == np.float64
assert mm.has_record(TIMING, stat="taar.similarity_recommend")
# Make sure that the reported addons are the expected ones from the most similar donor.
assert "{test-guid-1}" == recommendation
assert type(weight) == np.float64
def test_get_lr(test_ctx):