Merge branch 'staging' into jumin/dnn
This commit is contained in:
Коммит
d76ff8ff48
58
README.md
58
README.md
|
@ -1,10 +1,10 @@
|
|||
# Recommenders
|
||||
# Recommenders
|
||||
|
||||
This repository provides examples and best practices for building recommendation systems, provided as Jupyter notebooks. The examples detail our learnings on five key tasks:
|
||||
- [Prepare Data](notebooks/01_prepare_data/README.md): Preparing and loading data for each recommender algorithm
|
||||
- [Model](notebooks/02_model/README.md): Building models using various recommender algorithms such as Alternating Least Squares ([ALS](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/recommendation.html#ALS)), Singular Value Decomposition ([SVD](https://surprise.readthedocs.io/en/stable/matrix_factorization.html#surprise.prediction_algorithms.matrix_factorization.SVD)), etc.
|
||||
- [Evaluate](notebooks/03_evaluate/README.md): Evaluating algorithms with offline metrics
|
||||
- [Model Select and Optimize](notebooks/04_model_select_and_optimize): Tuning and optimizing hyperparameteres for recommender models
|
||||
- [Model Select and Optimize](notebooks/04_model_select_and_optimize): Tuning and optimizing hyperparameters for recommender models
|
||||
- [Operationalize](notebooks/05_operationalize/README.md): Operationalizing models in a production environment on Azure
|
||||
|
||||
Several utilities are provided in [reco_utils](reco_utils) to support common tasks such as loading datasets in the format expected by different algorithms, evaluating model outputs, and splitting train/test data. Implementations of several state-of-the-art algorithms are provided for self-study and customization in your own applications.
|
||||
|
@ -18,31 +18,33 @@ To setup on your local machine:
|
|||
```
|
||||
git clone https://github.com/Microsoft/Recommenders
|
||||
```
|
||||
3. Run the generate conda file script and create a conda environment:
|
||||
3. Run the generate conda file script to create a conda environment:
|
||||
(This is for a basic python environment, see [SETUP.md](SETUP.md) for PySpark and GPU environment setup)
|
||||
```
|
||||
cd Recommenders
|
||||
./scripts/generate_conda_file.sh
|
||||
conda env create -n reco -f conda_bare.yaml
|
||||
python scripts/generate_conda_file.py
|
||||
conda env create -f reco_base.yaml
|
||||
```
|
||||
4. Activate the conda environment and register it with Jupyter:
|
||||
```
|
||||
conda activate reco
|
||||
python -m ipykernel install --user --name reco --display-name "Python (reco)"
|
||||
conda activate reco_base
|
||||
python -m ipykernel install --user --name reco_base --display-name "Python (reco)"
|
||||
```
|
||||
5. Start the Jupyter notebook server
|
||||
```
|
||||
cd notebooks
|
||||
jupyter notebook
|
||||
```
|
||||
5. Run the [SAR Python CPU Movielens](notebooks/00_quick_start/sar_python_cpu_movielens.ipynb) notebook under the 00_quick_start folder. Make sure to change the kernel to "Python (reco)".
|
||||
6. Run the [SAR Python CPU Movielens](notebooks/00_quick_start/sar_movielens.ipynb) notebook under the 00_quick_start folder. Make sure to change the kernel to "Python (reco)".
|
||||
|
||||
## Notebooks
|
||||
**NOTE** - The [Alternating Least Squares (ALS)](notebooks/00_quick_start/als_movielens.ipynb) notebooks require a PySpark environment to run. Please follow the steps in the [setup guide](SETUP.md#dependencies-setup) to run these notebooks in a PySpark environment.
|
||||
|
||||
<<<<<<< HEAD
|
||||
We provide several notebooks to show how recommendation algorithms can be designed, evaluated and operationalized.
|
||||
|
||||
- The [Quick-Start Notebooks](notebooks/00_quick_start) detail how you can quickly get up and run with state-of-the-art algorithms such as the Smart Adaptive Recommendation ([SAR](https://github.com/Microsoft/Product-Recommendations/blob/master/doc/sar.md)) algorithm and ALS algorithm.
|
||||
|
||||
- The [Data Preparation Notebook](notebooks/01_prepare_data) shows how to prepare and split data properly for recommendation systems.
|
||||
- The [Data Preparation Notebooks](notebooks/01_prepare_data) show how to prepare and split data properly for recommendation systems.
|
||||
|
||||
- The [Modeling Notebooks](notebooks/02_model) provide a deep dive into implementations of different recommender algorithms.
|
||||
|
||||
|
@ -56,26 +58,47 @@ We provide several notebooks to show how recommendation algorithms can be design
|
|||
The Quick-Start and Modeling notebooks showcase how to utilize the following algorithms to build a recommender system:
|
||||
|
||||
**Algorithms**
|
||||
=======
|
||||
## Algorithms
|
||||
>>>>>>> staging
|
||||
|
||||
The table below lists recommender algorithms available in the repository at the moment.
|
||||
|
||||
| Algorithm | Environment | Type | Description |
|
||||
| --- | --- | --- | --- |
|
||||
| **`Classic Recommenders`** |
|
||||
| [Surprise/Singular Value Decomposition (SVD)](notebooks/00_quick_start/sar_movielens.ipynb) | Python | Collaborative Filtering | General purpose algorithm for smaller datasets |
|
||||
| [Alternating Least Squares (ALS)](notebooks/00_quick_start/als_movielens.ipynb) | Spark | Collaborative | General purpose algorithm for larger datasets, optimized with Spark |
|
||||
| **`Microsoft Recommenders`** |
|
||||
<<<<<<< HEAD
|
||||
| **Classic Recommenders** |
|
||||
| [Surprise/Singular Value Decomposition (SVD)](notebooks/02_model/surprise_svd_deep_dive.ipynb) | Python | Collaborative Filtering | General purpose algorithm for smaller datasets |
|
||||
| [Alternating Least Squares (ALS)](notebooks/00_quick_start/als_movielens.ipynb) | Spark | Collaborative Filtering | General purpose algorithm for larger datasets, optimized with Spark |
|
||||
| **Microsoft Recommenders** |
|
||||
| [Smart Adaptive Recommendations (SAR)](notebooks/00_quick_start/sar_movielens.ipynb) | Python / Spark | Collaborative Filtering | Generalized algorithm utilizing item similarities and can easily adapt to new users |
|
||||
| [Vowpal Wabbit Family (VW)](notebooks/02_model/vowpal_wabbit_deep_dive.ipynb) | Python / Online | Collaborative, Content Based | Fast online learning algorithms, great for scenarios where user features / context are constantly changing, like real-time bidding |
|
||||
| [Vowpal Wabbit Family (VW)](notebooks/02_model/vowpal_wabbit_deep_dive.ipynb) | Python / Online | Collaborative, Content-based Filtering | Fast online learning algorithms, great for scenarios where user features / context are constantly changing, like real-time bidding |
|
||||
| [eXtreme Deep Factorization Machine (xDeepFM)](notebooks/00_quick_start/xdeepfm_synthetic.ipynb) | Python / GPU | Hybrid | Deep learning model combining implicit and explicit features |
|
||||
| [Deep Knowledge-Aware Network (DKN)](notebooks/00_quick_start/dkn_synthetic.ipynb) | Python / GPU | Content Based | Deep learning model incorporating a knowledge graph and article embeddings to provide powerful news or article recommendations |
|
||||
| **`Deep Learning`** |
|
||||
| [Deep Knowledge-Aware Network (DKN)](notebooks/00_quick_start/dkn_synthetic.ipynb) | Python / GPU | Content-based Filtering | Deep learning model incorporating a knowledge graph and article embeddings to provide powerful news or article recommendations |
|
||||
| **Deep Learning Recommenders** |
|
||||
| [Neural Collaborative Filtering (NCF)](notebooks/00_quick_start/ncf_movielens.ipynb) | Python / GPU | Collaborative Filtering | General algorithm built using a multi-layer perceptron |
|
||||
| [Restricted Boltzmann Machines (RBM)](notebooks/00_quick_start/rbm_movielens.ipynb) | Python / GPU | Collaborative Filtering | Generative neural network algorithm built to learn the underlying probability distribution for user/item affinity |
|
||||
| [FastAI Embedding Dot Bias (FAST)](notebooks/00_quick_start/fastai_movielens.ipynb) | Python / GPU | Collaborative Filtering | General purpose algorithm embedding dot biases for users and items |
|
||||
|
||||
In addition, we also provide a [comparison notebook](notebooks/03_evaluate/comparison.ipynb) to illustrate how different algorithms could be evaluated and compared. In this notebook, data (MovieLens 1M) is randomly split into train/test sets at a 75/25 ratio. A recommendation model is trained using each of the collaborative filtering algorithms below. We utilize empirical parameter values reported in literature [here](http://mymedialite.net/examples/datasets.html). For ranking metrics we use k = 10 (top 10 results). We run the comparison on a Standard NC6s_v2 [Azure DSVM](https://azure.microsoft.com/en-us/services/virtual-machines/data-science-virtual-machines/) (6 vCPUs, 112 GB memory and 1 K80 GPU). Spark ALS is run in local standalone mode.
|
||||
=======
|
||||
| [Smart Adaptive Recommendations (SAR)<sup>*</sup>](notebooks/00_quick_start/sar_movielens.ipynb) | CPU-only | Collaborative Filtering | Similarity-based algorithm for implicit feedback dataset |
|
||||
| [Vowpal Wabbit Family (VW)<sup>*</sup>](notebooks/02_model/vowpal_wabbit_deep_dive.ipynb) | CPU-only (train online) | Collaborative, Content-based Filtering | Fast online learning algorithms, great for scenarios where user features / context are constantly changing, like real-time bidding |
|
||||
| [eXtreme Deep Factorization Machine (xDeepFM)<sup>*</sup>](notebooks/00_quick_start/xdeepfm_synthetic.ipynb) | CPU / GPU | Hybrid | Deep learning based algorithm for implicit and explicit feedback with user/item features |
|
||||
| [Deep Knowledge-Aware Network (DKN)<sup>*</sup>](notebooks/00_quick_start/dkn_synthetic.ipynb) | CPU / GPU | Content-based Filtering | Deep learning algorithm incorporating a knowledge graph and article embeddings to provide powerful news or article recommendations |
|
||||
| [Surprise/Singular Value Decomposition (SVD)](notebooks/00_quick_start/sar_movielens.ipynb) | CPU-only | Collaborative Filtering | Matrix factorization algorithm for predicting explicit rating feedback in small datasets |
|
||||
| [Alternating Least Squares (ALS)](notebooks/00_quick_start/als_movielens.ipynb) | Spark | Collaborative Filtering | Matrix factorization algorithm for explicit or implicit feedback in large datasets, optimized with Spark for scalability and distributed computing capability |
|
||||
| [Neural Collaborative Filtering (NCF)](notebooks/00_quick_start/ncf_movielens.ipynb) | CPU / GPU | Collaborative Filtering | Deep learning algorithm with enhanced performance for implicit feedback |
|
||||
| [Restricted Boltzmann Machines (RBM)](notebooks/00_quick_start/rbm_movielens.ipynb) | CPU / GPU | Collaborative Filtering | Neural network based algorithm to learn the underlying probability distribution for explicit or implicit feedback |
|
||||
| [FastAI Embedding Dot Bias (FAST)](notebooks/00_quick_start/fastai_movielens.ipynb) | CPU / GPU | Collaborative Filtering | General purpose algorithm with embedding dot biases for users and items |
|
||||
|
||||
**NOTE** - "<sup>*</sup>" indicates algorithms invented/contributed by Microsoft.
|
||||
>>>>>>> staging
|
||||
|
||||
**Preliminary Comparison**
|
||||
|
||||
We provide a [comparison notebook](notebooks/03_evaluate/comparison.ipynb) to illustrate how different algorithms could be evaluated and compared. In this notebook, data (MovieLens 1M) is randomly split into train/test sets at a 75/25 ratio. A recommendation model is trained using each of the collaborative filtering algorithms below. We utilize empirical parameter values reported in literature [here](http://mymedialite.net/examples/datasets.html). For ranking metrics we use k = 10 (top 10 results). We run the comparison on a Standard NC6s_v2 [Azure DSVM](https://azure.microsoft.com/en-us/services/virtual-machines/data-science-virtual-machines/) (6 vCPUs, 112 GB memory and 1 K80 GPU). Spark ALS is run in local standalone mode.
|
||||
|
||||
| Algo | MAP | nDCG@k | Precision@k | Recall@k | RMSE | MAE | R<sup>2</sup> | Explained Variance |
|
||||
| --- | --- | --- | --- | --- | --- | --- | --- | --- |
|
||||
| [ALS](notebooks/00_quick_start/als_movielens.ipynb) | 0.002020 | 0.024313 | 0.030677 | 0.009649 | 0.860502 | 0.680608 | 0.406014 | 0.411603 |
|
||||
|
@ -83,7 +106,6 @@ In addition, we also provide a [comparison notebook](notebooks/03_evaluate/compa
|
|||
| [FastAI](notebooks/00_quick_start/fastai_movielens.ipynb) | 0.023022 |0.168714 |0.154761 |0.050153 |0.887224 |0.705609 |0.371552 |0.374281 |
|
||||
|
||||
|
||||
|
||||
## Contributing
|
||||
This project welcomes contributions and suggestions. Before contributing, please see our [contribution guidelines](CONTRIBUTING.md).
|
||||
|
||||
|
|
32
SETUP.md
32
SETUP.md
|
@ -34,19 +34,21 @@ Currently, this repository supports the following environments:
|
|||
|
||||
### Setup Requirements
|
||||
|
||||
* Anaconda with Python version >= 3.6. [Miniconda](https://conda.io/miniconda.html) is the fastest way to get started.
|
||||
* The Python library dependencies can be found in this [script](scripts/generate_conda_file.sh).
|
||||
* Machine with Spark (optional for Python environment but mandatory for PySpark environment).
|
||||
* Machine running Linux, Windows Subsystem for Linux ([WSL](https://docs.microsoft.com/en-us/windows/wsl/about)) or macOS
|
||||
* Anaconda with Python version >= 3.6.
|
||||
* This is pre-installed on Azure DSVM, for local setup [Miniconda](https://docs.conda.io/en/latest/miniconda.html) is a quick way to get started.
|
||||
* [Apache Spark](https://spark.apache.org/downloads.html) (this is only needed for the PySpark environment).
|
||||
|
||||
### Dependencies setup
|
||||
|
||||
We install the dependencies with Conda. As a pre-requisite, we may want to make sure that Conda is up-to-date:
|
||||
We install the dependencies with Conda. As a pre-requisite, we want to make sure that Anaconda and the package manager Conda are both up to date:
|
||||
|
||||
```{shell}
|
||||
conda update conda
|
||||
conda update anaconda
|
||||
```
|
||||
|
||||
We provide a script to [generate a conda file](scripts/generate_conda_file.sh), depending of the environment we want to use. This will create the environment using the Python version 3.6 with all the correct dependencies.
|
||||
We provide a script, [generate_conda_file.py](scripts/generate_conda_file.py), to generate a conda file, depending of the environment we want to use. This will create the environment using the Python version 3.6 with all the correct dependencies.
|
||||
|
||||
To install each environment, first we need to generate a conda yaml file and then install the environment. We can specify the environment name with the input `-n`.
|
||||
|
||||
|
@ -58,8 +60,8 @@ Click on the following menus to see more details:
|
|||
Assuming the repo is cloned as `Recommenders` in the local system, to install the Python CPU environment:
|
||||
|
||||
cd Recommenders
|
||||
./scripts/generate_conda_file.sh
|
||||
conda env create -n reco_bare -f conda_bare.yaml
|
||||
python scripts/generate_conda_file.py
|
||||
conda env create -f reco_base.yaml
|
||||
|
||||
</details>
|
||||
|
||||
|
@ -69,8 +71,8 @@ Assuming the repo is cloned as `Recommenders` in the local system, to install th
|
|||
Assuming that you have a GPU machine, to install the Python GPU environment, which by default installs the CPU environment:
|
||||
|
||||
cd Recommenders
|
||||
./scripts/generate_conda_file.sh --gpu
|
||||
conda env create -n reco_gpu -f conda_gpu.yaml
|
||||
python scripts/generate_conda_file.py --gpu
|
||||
conda env create -f reco_gpu.yaml
|
||||
|
||||
</details>
|
||||
|
||||
|
@ -80,14 +82,14 @@ Assuming that you have a GPU machine, to install the Python GPU environment, whi
|
|||
To install the PySpark environment, which by default installs the CPU environment:
|
||||
|
||||
cd Recommenders
|
||||
./scripts/generate_conda_file.sh --pyspark
|
||||
conda env create -n reco_pyspark -f conda_pyspark.yaml
|
||||
python scripts/generate_conda_file.py --pyspark
|
||||
conda env create -f reco_pyspark.yaml
|
||||
|
||||
Additionally, if you want to test a particular version of spark, you may pass the --pyspark-version argument:
|
||||
|
||||
./scripts/generate_conda_file.sh --pyspark-version 2.4.0
|
||||
python scripts/generate_conda_file.py --pyspark-version 2.4.0
|
||||
|
||||
**NOTE** - for this environment, we need to set the environment variables `PYSPARK_PYTHON` and `PYSPARK_DRIVER_PYTHON` to point to the conda python executable.
|
||||
**NOTE** - for a PySpark environment, we need to set the environment variables `PYSPARK_PYTHON` and `PYSPARK_DRIVER_PYTHON` to point to the conda python executable.
|
||||
|
||||
To set these variables every time the environment is activated, we can follow the steps of this [guide](https://conda.io/docs/user-guide/tasks/manage-environments.html#macos-and-linux). Assuming that we have installed the environment in `/anaconda/envs/reco_pyspark`, we create the file `/anaconda/envs/reco_pyspark/etc/conda/activate.d/env_vars.sh` and add:
|
||||
|
||||
|
@ -112,8 +114,8 @@ unset PYSPARK_DRIVER_PYTHON
|
|||
To install all three environments:
|
||||
|
||||
cd Recommenders
|
||||
./scripts/generate_conda_file.sh --gpu --pyspark
|
||||
conda env create -n reco_full -f conda_full.yaml
|
||||
python scripts/generate_conda_file.py --gpu --pyspark
|
||||
conda env create -f reco_full.yaml
|
||||
|
||||
</details>
|
||||
|
||||
|
|
|
@ -13,11 +13,18 @@
|
|||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Running ALS on MovieLens (pySpark)\n",
|
||||
"# Running ALS on MovieLens (PySpark)\n",
|
||||
"\n",
|
||||
"Matrix factorization by [ALS](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/recommendation.html#ALS) (Alternating Least Squares) is a well known collaborative filtering algorithm.\n",
|
||||
"\n",
|
||||
"This notebook provides an example of how to utilize and evaluate ALS pySpark ML (DataFrame-based API) implementation, meant for large-scale distributed datasets. We use a smaller dataset in this example to run ALS efficiently on multiple cores of a [Data Science Virtual Machine](https://azure.microsoft.com/en-gb/services/virtual-machines/data-science-virtual-machines/)."
|
||||
"This notebook provides an example of how to utilize and evaluate ALS PySpark ML (DataFrame-based API) implementation, meant for large-scale distributed datasets. We use a smaller dataset in this example to run ALS efficiently on multiple cores of a [Data Science Virtual Machine](https://azure.microsoft.com/en-gb/services/virtual-machines/data-science-virtual-machines/)."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"**Note**: This notebook requires a PySpark environment to run properly. Please follow the steps in [SETUP.md](https://github.com/Microsoft/Recommenders/blob/master/SETUP.md#dependencies-setup) to install the PySpark environment."
|
||||
]
|
||||
},
|
||||
{
|
||||
|
|
|
@ -84,7 +84,7 @@
|
|||
"import papermill as pm\n",
|
||||
"\n",
|
||||
"from reco_utils.recommender.rbm.rbm import RBM\n",
|
||||
"from reco_utils.dataset.numpy_splitters import numpy_stratified_split\n",
|
||||
"from reco_utils.dataset.python_splitters import numpy_stratified_split\n",
|
||||
"from reco_utils.dataset.sparse import AffinityMatrix\n",
|
||||
"\n",
|
||||
"\n",
|
||||
|
|
|
@ -78,6 +78,13 @@
|
|||
"In the following code, the Movielens-100K dataset is used to illustrate the ALS algorithm in Spark."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"**Note**: This notebook requires a PySpark environment to run properly. Please follow the steps in [SETUP.md](https://github.com/Microsoft/Recommenders/blob/master/SETUP.md#dependencies-setup) to install the PySpark environment."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
|
|
|
@ -116,7 +116,7 @@
|
|||
"\n",
|
||||
"### 1.2 The MLP model\n",
|
||||
"\n",
|
||||
"NCF adopts two pathways to model users and items: 1) element-wise product of vectors, 2) concatenation of vectors. To learn interactions after concatenating of users and items lantent features, the standard MLP model is applied. In this sense, we can endow the model a large level of flexibility and non-linearity to learn the interactions between $p_{u}$ and $q_{i}$. The details of MLP model are:\n",
|
||||
"NCF adopts two pathways to model users and items: 1) element-wise product of vectors, 2) concatenation of vectors. To learn interactions after concatenating of users and items latent features, the standard MLP model is applied. In this sense, we can endow the model a large level of flexibility and non-linearity to learn the interactions between $p_{u}$ and $q_{i}$. The details of MLP model are:\n",
|
||||
"\n",
|
||||
"For the input layer, there is concatention of user and item vectors:\n",
|
||||
"\n",
|
||||
|
@ -134,7 +134,7 @@
|
|||
"\\hat { r } _ { u , i } = \\sigma \\left( h ^ { T } \\phi \\left( z _ { L - 1 } \\right) \\right)\n",
|
||||
"$$\n",
|
||||
"\n",
|
||||
"where ${ W }_{ l }$, ${ b }_{ l }$, and ${ a }_{ out }$ denote the weight matrix, bias vector, and activation function for the $l$-th layer’s perceptron, respectively. For activation functions of MLP layers, one can freely choose sigmoid, hyperbolic tangent (tanh), and Rectifier (ReLU), among others. Because of implicit data task, the activation function of the output layer is defined as sigmoid $\\sigma(x)=\\frac{1}{1+\\exp{(-x)}}$ to restrict the predicted score to be in (0,1).\n",
|
||||
"where ${ W }_{ l }$, ${ b }_{ l }$, and ${ a }_{ out }$ denote the weight matrix, bias vector, and activation function for the $l$-th layer’s perceptron, respectively. For activation functions of MLP layers, one can freely choose sigmoid, hyperbolic tangent (tanh), and Rectifier (ReLU), among others. Because of implicit data task, the activation function of the output layer is defined as sigmoid $\\sigma(x)=\\frac{1}{1+e^{-x}}$ to restrict the predicted score to be in (0,1).\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"### 1.3 Fusion of GMF and MLP\n",
|
||||
|
@ -159,11 +159,11 @@
|
|||
"\n",
|
||||
"$$P \\left( \\mathcal { R } , \\mathcal { R } ^ { - } | \\mathbf { P } , \\mathbf { Q } , \\Theta \\right) = \\prod _ { ( u , i ) \\in \\mathcal { R } } \\hat { r } _ { u , i } \\prod _ { ( u , j ) \\in \\mathcal { R } ^{ - } } \\left( 1 - \\hat { r } _ { u , j } \\right)$$\n",
|
||||
"\n",
|
||||
"Where $\\mathcal{R}$ denotes the set of observed interactions, and $\\mathcal{ R } ^ { - }$ denotes the set of negative instances. $\\mathbf{P}$ and $\\mathbf{Q}$ denotes the latent factor matrix for users and items, respectively; and $\\Theta$ denotes the model parameters. Taking the negative logarithm of the likelihood, we obatain the objective function to minimize for NCF method, which is known as *binary cross-entropy loss*:\n",
|
||||
"Where $\\mathcal{R}$ denotes the set of observed interactions, and $\\mathcal{ R } ^ { - }$ denotes the set of negative instances. $\\mathbf{P}$ and $\\mathbf{Q}$ denotes the latent factor matrix for users and items, respectively; and $\\Theta$ denotes the model parameters. Taking the negative logarithm of the likelihood, we obatain the objective function to minimize for NCF method, which is known as [binary cross-entropy loss](https://en.wikipedia.org/wiki/Cross_entropy):\n",
|
||||
"\n",
|
||||
"$$L = - \\sum _ { ( u , i ) \\in \\mathcal { R } \\cup { \\mathcal { R } } ^ { - } } r _ { u , i } \\log \\hat { r } _ { u , i } + \\left( 1 - r _ { u , i } \\right) \\log \\left( 1 - \\hat { r } _ { u , i } \\right)$$\n",
|
||||
"\n",
|
||||
"The optimization can be done by performing Stochastic Gradient Descent (SGD), which has been introduced by the SVD algorithm in surprise svd deep dive notebook. Our SGD method is very similar to the SVD algorithm's."
|
||||
"The optimization can be done by performing Stochastic Gradient Descent (SGD), which is described in the [Surprise SVD deep dive notebook](../02_model/surprise_svd_deep_dive.ipynb). Our SGD method is very similar to the SVD algorithm's."
|
||||
]
|
||||
},
|
||||
{
|
||||
|
|
|
@ -91,7 +91,7 @@
|
|||
"\n",
|
||||
"#RBM \n",
|
||||
"from reco_utils.recommender.rbm.rbm import RBM\n",
|
||||
"from reco_utils.dataset.numpy_splitters import numpy_stratified_split\n",
|
||||
"from reco_utils.dataset.python_splitters import numpy_stratified_split\n",
|
||||
"from reco_utils.dataset.sparse import AffinityMatrix\n",
|
||||
"\n",
|
||||
"#Evaluation libraries\n",
|
||||
|
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -1479,7 +1479,7 @@
|
|||
"source": [
|
||||
"|Approach|Distributed (on Spark)|Param sampling|Advanced hyperparam searching algo|Custom evaluation metrics|Custom data split|\n",
|
||||
"|---------|-------------|--------------|--------------------------|--------------|------------|\n",
|
||||
"|AML Services|Parallelizing Spark sessions on multi-node cluster or single Spark session on one VM node.)|Random, Grid, Bayesian sampling for discrete and continuous variables.|Bandit policy, Median stopping policy, and truncation selection policy.|Yes|Yes|\n",
|
||||
"|AzureML Services|Parallelizing Spark sessions on multi-node cluster or single Spark session on one VM node.)|Random, Grid, Bayesian sampling for discrete and continuous variables.|Bandit policy, Median stopping policy, and truncation selection policy.|Yes|Yes|\n",
|
||||
"|Spark native construct|Distributed in single-node standalone Spark environment or multi-node Spark cluster.|No|No|Need to re-engineer Spark modules|Need to re-engineer Spark modules.|\n",
|
||||
"|`hyperopt`|No (only support parallelization on MongoDB)|Random sampling for discrete and continuous variables.|Tree Parzen Estimator|Yes|Yes|"
|
||||
]
|
||||
|
|
|
@ -200,7 +200,7 @@
|
|||
"metadata": {},
|
||||
"source": [
|
||||
"### 1.1 Import or create the AzureML Workspace. \n",
|
||||
"This command will check if the AML Workspace exists or not, and will create the workspace if it doesn't exist."
|
||||
"This command will check if the AzureML Workspace exists or not, and will create the workspace if it doesn't exist."
|
||||
]
|
||||
},
|
||||
{
|
||||
|
|
|
@ -7,6 +7,7 @@ DEFAULT_ITEM_COL = "itemID"
|
|||
DEFAULT_RATING_COL = "rating"
|
||||
DEFAULT_TIMESTAMP_COL = "timestamp"
|
||||
PREDICTION_COL = "prediction"
|
||||
DEFAULT_PREDICTION_COL = PREDICTION_COL
|
||||
|
||||
# Filtering variables
|
||||
DEFAULT_K = 10
|
||||
|
|
|
@ -1,92 +0,0 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
"""
|
||||
Collection of numpy based splitters
|
||||
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def numpy_stratified_split(X, ratio=0.75, seed=123):
|
||||
|
||||
"""
|
||||
Split the user/item affinity matrix into train and test set matrices while mantaining
|
||||
local (i.e. per user) ratios.
|
||||
|
||||
Args:
|
||||
X (np.array, int): a sparse matrix
|
||||
ratio (scalar, float): fraction of the entire dataset to constitute the train set
|
||||
seed (scalar, int): random seed
|
||||
|
||||
Returns:
|
||||
Xtr (np.array, int): train set user/item affinity matrix
|
||||
Xtst (np.array, int): test set user/item affinity matrix
|
||||
|
||||
Basic mechanics:
|
||||
Main points :
|
||||
|
||||
1. In a typical recommender problem, different users rate a different number of items,
|
||||
and therefore the user/affinity matrix has a sparse structure with variable number
|
||||
of zeroes (unrated items) per row (user). Cutting a total amount of ratings will
|
||||
result in a non-homogenou distribution between train and test set, i.e. some test
|
||||
users may have many ratings while other very little if none.
|
||||
|
||||
2. In an unsupervised learning problem, no explicit answer is given. For this reason
|
||||
the split needs to be implemented in a different way then in supervised learningself.
|
||||
In the latter, one typically split the dataset by rows (by examples), ending up with
|
||||
the same number of feautures but different number of examples in the train/test setself.
|
||||
This scheme does not work in the unsupervised case, as part of the rated items needs to
|
||||
be used as a test set for fixed number of users.
|
||||
|
||||
Solution:
|
||||
|
||||
1. Instead of cutting a total percentage, for each user we cut a relative ratio of the rated
|
||||
items. For example, if user1 has rated 4 items and user2 10, cutting 25% will correspond to
|
||||
1 and 2.6 ratings in the test set, approximated as 1 and 3 according to the round() function.
|
||||
In this way, the 0.75 ratio is satified both locally and globally, preserving the original
|
||||
distribution of ratings across the train and test set.
|
||||
|
||||
2. It is easy (and fast) to satisfy this requirements by creating the test via element subtraction
|
||||
from the original datatset X. We first create two copies of X; for each user we select a random
|
||||
sample of local size ratio (point 1) and erase the remaining ratings, obtaining in this way the
|
||||
train set matrix Xtst. The train set matrix is obtained in the opposite way.
|
||||
|
||||
|
||||
"""
|
||||
|
||||
np.random.seed(seed) # set the random seed
|
||||
|
||||
test_cut = int((1 - ratio) * 100) # percentage of ratings to go in the test set
|
||||
|
||||
# initialize train and test set matrices
|
||||
Xtr = X.copy()
|
||||
Xtst = X.copy()
|
||||
|
||||
# find the number of rated movies per user
|
||||
rated = np.sum(Xtr != 0, axis=1)
|
||||
|
||||
# for each user, cut down a test_size% for the test set
|
||||
tst = np.around((rated * test_cut) / 100).astype(int)
|
||||
|
||||
Nusers, Nitems = X.shape # total number of users and items
|
||||
|
||||
for u in range(Nusers):
|
||||
# For each user obtain the index of rated movies
|
||||
idx = np.asarray(np.where(Xtr[u] != 0))[0].tolist()
|
||||
|
||||
# extract a random subset of size n from the set of rated movies without repetition
|
||||
idx_tst = np.random.choice(idx, tst[u], replace=False)
|
||||
idx_train = list(set(idx).difference(set(idx_tst)))
|
||||
|
||||
Xtr[
|
||||
u, idx_tst
|
||||
] = 0 # change the selected rated movies to unrated in the train set
|
||||
Xtst[
|
||||
u, idx_train
|
||||
] = 0 # set the movies that appear already in the train set as 0
|
||||
|
||||
del idx, idx_train, idx_tst
|
||||
|
||||
return Xtr, Xtst
|
|
@ -1,6 +1,6 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from sklearn.model_selection import train_test_split as sk_split
|
||||
|
||||
|
@ -72,21 +72,26 @@ def python_chrono_split(
|
|||
Returns:
|
||||
list: Splits of the input data as pd.DataFrame.
|
||||
"""
|
||||
# A few preliminary checks.
|
||||
if not (filter_by == "user" or filter_by == "item"):
|
||||
raise ValueError("filter_by should be either 'user' or 'item'.")
|
||||
|
||||
if min_rating < 1:
|
||||
raise ValueError("min_rating should be integer and larger than or equal to 1.")
|
||||
|
||||
if col_user not in data.columns:
|
||||
raise ValueError("Schema of data not valid. Missing User Col")
|
||||
|
||||
if col_item not in data.columns:
|
||||
raise ValueError("Schema of data not valid. Missing Item Col")
|
||||
|
||||
if col_timestamp not in data.columns:
|
||||
raise ValueError("Schema of data not valid. Missing Timestamp Col")
|
||||
|
||||
multi_split, ratio = process_split_ratio(ratio)
|
||||
|
||||
split_by_column = col_user if filter_by == "user" else col_item
|
||||
|
||||
# Sort data by timestamp.
|
||||
data = data.sort_values(
|
||||
by=[split_by_column, col_timestamp], axis=0, ascending=False
|
||||
)
|
||||
|
||||
ratio = ratio if multi_split else [ratio, 1 - ratio]
|
||||
|
||||
if min_rating > 1:
|
||||
|
@ -98,17 +103,26 @@ def python_chrono_split(
|
|||
col_item=col_item,
|
||||
)
|
||||
|
||||
num_of_splits = len(ratio)
|
||||
splits = [pd.DataFrame({})] * num_of_splits
|
||||
# Split by each group and aggregate splits together.
|
||||
splits = []
|
||||
df_grouped = data.sort_values(col_timestamp).groupby(split_by_column)
|
||||
for name, group in df_grouped:
|
||||
group_splits = split_pandas_data_with_ratios(
|
||||
df_grouped.get_group(name), ratio, resample=False
|
||||
)
|
||||
for x in range(num_of_splits):
|
||||
splits[x] = pd.concat([splits[x], group_splits[x]])
|
||||
|
||||
return splits
|
||||
# Concatenate the list of split dataframes.
|
||||
concat_group_splits = pd.concat(group_splits)
|
||||
|
||||
splits.append(concat_group_splits)
|
||||
|
||||
# Concatenate splits for all the groups together.
|
||||
splits_all = pd.concat(splits)
|
||||
|
||||
# Take split by split_index
|
||||
splits_list = [splits_all[splits_all['split_index'] == x].drop('split_index', axis=1) for x in range(len(ratio))]
|
||||
|
||||
return splits_list
|
||||
|
||||
|
||||
def python_stratified_split(
|
||||
|
@ -141,12 +155,19 @@ def python_stratified_split(
|
|||
Returns:
|
||||
list: Splits of the input data as pd.DataFrame.
|
||||
"""
|
||||
# A few preliminary checks.
|
||||
if not (filter_by == "user" or filter_by == "item"):
|
||||
raise ValueError("filter_by should be either 'user' or 'item'.")
|
||||
|
||||
if min_rating < 1:
|
||||
raise ValueError("min_rating should be integer and larger than or equal to 1.")
|
||||
|
||||
if col_user not in data.columns:
|
||||
raise ValueError("Schema of data not valid. Missing User Col")
|
||||
|
||||
if col_item not in data.columns:
|
||||
raise ValueError("Schema of data not valid. Missing Item Col")
|
||||
|
||||
multi_split, ratio = process_split_ratio(ratio)
|
||||
|
||||
split_by_column = col_user if filter_by == "user" else col_item
|
||||
|
@ -162,14 +183,106 @@ def python_stratified_split(
|
|||
col_item=col_item,
|
||||
)
|
||||
|
||||
num_of_splits = len(ratio)
|
||||
splits = [pd.DataFrame({})] * num_of_splits
|
||||
# Split by each group and aggregate splits together.
|
||||
splits = []
|
||||
df_grouped = data.groupby(split_by_column)
|
||||
for name, group in df_grouped:
|
||||
group_splits = split_pandas_data_with_ratios(
|
||||
df_grouped.get_group(name), ratio, resample=True, seed=seed
|
||||
)
|
||||
for x in range(num_of_splits):
|
||||
splits[x] = pd.concat([splits[x], group_splits[x]])
|
||||
|
||||
return splits
|
||||
# Concatenate the list of split dataframes.
|
||||
concat_group_splits = pd.concat(group_splits)
|
||||
|
||||
splits.append(concat_group_splits)
|
||||
|
||||
# Concatenate splits for all the groups together.
|
||||
splits_all = pd.concat(splits)
|
||||
|
||||
# Take split by split_index
|
||||
splits_list = [splits_all[splits_all['split_index'] == x].drop('split_index', axis=1) for x in range(len(ratio))]
|
||||
|
||||
return splits_list
|
||||
|
||||
|
||||
def numpy_stratified_split(X, ratio=0.75, seed=123):
|
||||
|
||||
"""
|
||||
Split the user/item affinity matrix (sparse matrix) into train and test set matrices while maintaining
|
||||
local (i.e. per user) ratios.
|
||||
|
||||
Args:
|
||||
X (np.array, int): a sparse matrix to be split
|
||||
ratio (scalar, float): fraction of the entire dataset to constitute the train set
|
||||
seed (scalar, int): random seed
|
||||
|
||||
Returns:
|
||||
Xtr (np.array, int): train set user/item affinity matrix
|
||||
Xtst (np.array, int): test set user/item affinity matrix
|
||||
|
||||
Basic mechanics:
|
||||
Main points :
|
||||
|
||||
1. In a typical recommender problem, different users rate a different number of items,
|
||||
and therefore the user/affinity matrix has a sparse structure with variable number
|
||||
of zeroes (unrated items) per row (user). Cutting a total amount of ratings will
|
||||
result in a non-homogenou distribution between train and test set, i.e. some test
|
||||
users may have many ratings while other very little if none.
|
||||
|
||||
2. In an unsupervised learning problem, no explicit answer is given. For this reason
|
||||
the split needs to be implemented in a different way then in supervised learningself.
|
||||
In the latter, one typically split the dataset by rows (by examples), ending up with
|
||||
the same number of feautures but different number of examples in the train/test setself.
|
||||
This scheme does not work in the unsupervised case, as part of the rated items needs to
|
||||
be used as a test set for fixed number of users.
|
||||
|
||||
Solution:
|
||||
|
||||
1. Instead of cutting a total percentage, for each user we cut a relative ratio of the rated
|
||||
items. For example, if user1 has rated 4 items and user2 10, cutting 25% will correspond to
|
||||
1 and 2.6 ratings in the test set, approximated as 1 and 3 according to the round() function.
|
||||
In this way, the 0.75 ratio is satified both locally and globally, preserving the original
|
||||
distribution of ratings across the train and test set.
|
||||
|
||||
2. It is easy (and fast) to satisfy this requirements by creating the test via element subtraction
|
||||
from the original datatset X. We first create two copies of X; for each user we select a random
|
||||
sample of local size ratio (point 1) and erase the remaining ratings, obtaining in this way the
|
||||
train set matrix Xtst. The train set matrix is obtained in the opposite way.
|
||||
|
||||
|
||||
"""
|
||||
|
||||
np.random.seed(seed) # set the random seed
|
||||
|
||||
test_cut = int((1 - ratio) * 100) # percentage of ratings to go in the test set
|
||||
|
||||
# initialize train and test set matrices
|
||||
Xtr = X.copy()
|
||||
Xtst = X.copy()
|
||||
|
||||
# find the number of rated movies per user
|
||||
rated = np.sum(Xtr != 0, axis=1)
|
||||
|
||||
# for each user, cut down a test_size% for the test set
|
||||
tst = np.around((rated * test_cut) / 100).astype(int)
|
||||
|
||||
Nusers, Nitems = X.shape # total number of users and items
|
||||
|
||||
for u in range(Nusers):
|
||||
# For each user obtain the index of rated movies
|
||||
idx = np.asarray(np.where(Xtr[u] != 0))[0].tolist()
|
||||
|
||||
# extract a random subset of size n from the set of rated movies without repetition
|
||||
idx_tst = np.random.choice(idx, tst[u], replace=False)
|
||||
idx_train = list(set(idx).difference(set(idx_tst)))
|
||||
|
||||
Xtr[
|
||||
u, idx_tst
|
||||
] = 0 # change the selected rated movies to unrated in the train set
|
||||
Xtst[
|
||||
u, idx_train
|
||||
] = 0 # set the movies that appear already in the train set as 0
|
||||
|
||||
del idx, idx_train, idx_tst
|
||||
|
||||
return Xtr, Xtst
|
||||
|
|
|
@ -56,7 +56,7 @@ def spark_chrono_split(
|
|||
Args:
|
||||
data (spark.DataFrame): Spark DataFrame to be split.
|
||||
ratio (float or list): Ratio for splitting data. If it is a single float number
|
||||
it splits data into two halfs and the ratio argument indicates the ratio of
|
||||
it splits data into two sets and the ratio argument indicates the ratio of
|
||||
training data set; if it is a list of float numbers, the splitter splits
|
||||
data into several portions corresponding to the split ratios. If a list is
|
||||
provided and the ratios are not summed to 1, they will be normalized.
|
||||
|
@ -93,7 +93,7 @@ def spark_chrono_split(
|
|||
ratio = ratio if multi_split else [ratio, 1 - ratio]
|
||||
ratio_index = np.cumsum(ratio)
|
||||
|
||||
window_spec = Window.partitionBy(split_by_column).orderBy(col(col_timestamp).desc())
|
||||
window_spec = Window.partitionBy(split_by_column).orderBy(col(col_timestamp))
|
||||
|
||||
rating_grouped = (
|
||||
data.groupBy(split_by_column)
|
||||
|
@ -141,6 +141,8 @@ def spark_stratified_split(
|
|||
training data set; if it is a list of float numbers, the splitter splits
|
||||
data into several portions corresponding to the split ratios. If a list is
|
||||
provided and the ratios are not summed to 1, they will be normalized.
|
||||
Earlier indexed splits will have earlier times
|
||||
(e.g the latest time per user or item in split[0] <= the earliest time per user or item in split[1])
|
||||
seed (int): Seed.
|
||||
min_rating (int): minimum number of ratings for user or item.
|
||||
filter_by (str): either "user" or "item", depending on which of the two is to filter
|
||||
|
@ -216,10 +218,12 @@ def spark_timestamp_split(
|
|||
Args:
|
||||
data (spark.DataFrame): Spark DataFrame to be split.
|
||||
ratio (float or list): Ratio for splitting data. If it is a single float number
|
||||
it splits data into two halfs and the ratio argument indicates the ratio of
|
||||
it splits data into two sets and the ratio argument indicates the ratio of
|
||||
training data set; if it is a list of float numbers, the splitter splits
|
||||
data into several portions corresponding to the split ratios. If a list is
|
||||
provided and the ratios are not summed to 1, they will be normalized.
|
||||
Earlier indexed splits will have earlier times
|
||||
(e.g the latest time in split[0] <= the earliest time in split[1])
|
||||
col_user (str): column name of user IDs.
|
||||
col_item (str): column name of item IDs.
|
||||
col_timestamp (str): column name of timestamps. Float number represented in
|
||||
|
@ -233,7 +237,7 @@ def spark_timestamp_split(
|
|||
ratio = ratio if multi_split else [ratio, 1 - ratio]
|
||||
ratio_index = np.cumsum(ratio)
|
||||
|
||||
window_spec = Window.orderBy(col(col_timestamp).desc())
|
||||
window_spec = Window.orderBy(col(col_timestamp))
|
||||
rating = data.withColumn("rank", row_number().over(window_spec))
|
||||
|
||||
data_count = rating.count()
|
||||
|
|
|
@ -26,19 +26,6 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class AffinityMatrix:
|
||||
"""
|
||||
|
||||
Args:
|
||||
df (pd.DataFrame): a dataframe containing the data
|
||||
col_user (str): default name for user column
|
||||
col_item (str): default name for item column
|
||||
col_rating (str): default name for rating columns
|
||||
col_time (str): default name for timestamp columns
|
||||
save_model (Bool): if True it saves the item/user maps
|
||||
save_path (str): default path to save item/user maps
|
||||
|
||||
"""
|
||||
|
||||
# initialize class parameters
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -47,11 +34,18 @@ class AffinityMatrix:
|
|||
col_item=DEFAULT_ITEM_COL,
|
||||
col_rating=DEFAULT_RATING_COL,
|
||||
col_pred=PREDICTION_COL,
|
||||
col_time=DEFAULT_TIMESTAMP_COL,
|
||||
save_path=None,
|
||||
debug=False,
|
||||
):
|
||||
"""Generate the user/item affinity matrix from a pandas dataframe and vice versa
|
||||
|
||||
Args:
|
||||
DF (pd.DataFrame): a dataframe containing the data
|
||||
col_user (str): default name for user column
|
||||
col_item (str): default name for item column
|
||||
col_rating (str): default name for rating columns
|
||||
save_path (str): default path to save item/user maps
|
||||
|
||||
"""
|
||||
self.df = DF # dataframe
|
||||
|
||||
# pandas DF parameters
|
||||
|
@ -63,12 +57,10 @@ class AffinityMatrix:
|
|||
# Options to save the model for future use
|
||||
self.save_path = save_path
|
||||
|
||||
def gen_index(self):
|
||||
def _gen_index(self):
|
||||
|
||||
"""
|
||||
Generate the user/item index
|
||||
|
||||
Returns:
|
||||
Generate the user/item index:
|
||||
map_users, map_items: dictionaries mapping the original user/item index to matrix indices
|
||||
map_back_users, map_back_items: dictionaries to map back the matrix elements to the original
|
||||
dataframe indices
|
||||
|
@ -105,13 +97,13 @@ class AffinityMatrix:
|
|||
self.df_.loc[:, "hashedUsers"] = self.df_[self.col_user].map(self.map_users)
|
||||
|
||||
# optionally save the inverse dictionary to work with trained models
|
||||
if self.save_path != None:
|
||||
if self.save_path is not None:
|
||||
|
||||
np.save(self.save_path_ + "/user_dict", self.map_users)
|
||||
np.save(self.save_path_ + "/item_dict", self.map_items)
|
||||
np.save(self.save_path + "/user_dict", self.map_users)
|
||||
np.save(self.save_path + "/item_dict", self.map_items)
|
||||
|
||||
np.save(self.save_path_ + "/user_back_dict", self.map_back_users)
|
||||
np.save(self.save_path_ + "/item_back_dict", self.map_back_items)
|
||||
np.save(self.save_path + "/user_back_dict", self.map_back_users)
|
||||
np.save(self.save_path + "/item_back_dict", self.map_back_items)
|
||||
|
||||
def gen_affinity_matrix(self):
|
||||
|
||||
|
@ -135,7 +127,7 @@ class AffinityMatrix:
|
|||
|
||||
log.info("Generating the user/item affinity matrix...")
|
||||
|
||||
self.gen_index()
|
||||
self._gen_index()
|
||||
|
||||
ratings = self.df_[self.col_rating] # ratings
|
||||
itm_id = self.df_["hashedItems"] # itm_id serving as columns
|
||||
|
|
|
@ -159,4 +159,8 @@ def split_pandas_data_with_ratios(data, ratios, seed=1234, resample=False):
|
|||
|
||||
splits = np.split(data, [round(x * len(data)) for x in split_index])
|
||||
|
||||
# Add split index (this makes splitting by group more efficient).
|
||||
for i in range(len(ratios)):
|
||||
splits[i]['split_index'] = i
|
||||
|
||||
return splits
|
||||
|
|
|
@ -16,89 +16,108 @@
|
|||
# $ python generate_conda_file.py --gpu --pyspark-version 2.4.0
|
||||
|
||||
import argparse
|
||||
import textwrap
|
||||
|
||||
|
||||
CHANNELS = [
|
||||
'conda-forge',
|
||||
'pytorch',
|
||||
'fastai',
|
||||
'defaults',
|
||||
]
|
||||
HELP_MSG = """
|
||||
To create the conda environment:
|
||||
$ conda env create -f {conda_env}.yaml
|
||||
|
||||
To update the conda environment:
|
||||
$ conda env update -f {conda_env}.yaml
|
||||
|
||||
To register the conda environment in Jupyter:
|
||||
$ conda activate {conda_env}
|
||||
$ python -m ipykernel install --user --name {conda_env} --display-name "Python ({conda_env})"
|
||||
"""
|
||||
|
||||
CHANNELS = ["conda-forge", "pytorch", "fastai", "defaults"]
|
||||
|
||||
CONDA_BASE = {
|
||||
'dask': 'dask>=0.17.1',
|
||||
'fastai': 'fastai>=1.0.40',
|
||||
'fastparquet': 'fastparquet>=0.1.6',
|
||||
'gitpython': 'gitpython>=2.1.8',
|
||||
'ipykernel': 'ipykernel>=4.6.1',
|
||||
'jupyter': 'jupyter>=1.0.0',
|
||||
'matplotlib': 'matplotlib>=2.2.2',
|
||||
'numpy': 'numpy>=1.13.3',
|
||||
'pandas': 'pandas>=0.23.4',
|
||||
'pymongo': 'pymongo>=3.6.1',
|
||||
'python': 'python==3.6.8',
|
||||
'pytest': 'pytest>=3.6.4',
|
||||
'seaborn': 'seaborn>=0.8.1',
|
||||
'scikit-learn': 'scikit-learn==0.19.1',
|
||||
'scipy': 'scipy>=1.0.0',
|
||||
'scikit-surprise': 'scikit-surprise>=1.0.6',
|
||||
'tensorflow': 'tensorflow==1.12.0',
|
||||
"dask": "dask>=0.17.1",
|
||||
"fastai": "fastai>=1.0.40",
|
||||
"fastparquet": "fastparquet>=0.1.6",
|
||||
"gitpython": "gitpython>=2.1.8",
|
||||
"ipykernel": "ipykernel>=4.6.1",
|
||||
"jupyter": "jupyter>=1.0.0",
|
||||
"matplotlib": "matplotlib>=2.2.2",
|
||||
"numpy": "numpy>=1.13.3",
|
||||
"pandas": "pandas>=0.23.4",
|
||||
"pymongo": "pymongo>=3.6.1",
|
||||
"python": "python==3.6.8",
|
||||
"pytest": "pytest>=3.6.4",
|
||||
"pytorch": "pytorch-cpu>=1.0.0",
|
||||
"seaborn": "seaborn>=0.8.1",
|
||||
"scikit-learn": "scikit-learn==0.19.1",
|
||||
"scipy": "scipy>=1.0.0",
|
||||
"scikit-surprise": "scikit-surprise>=1.0.6",
|
||||
"tensorflow": "tensorflow==1.12.0",
|
||||
}
|
||||
|
||||
CONDA_PYSPARK = {
|
||||
'pyarrow': 'pyarrow>=0.8.0',
|
||||
'pyspark': 'pyspark==2.3.1',
|
||||
}
|
||||
CONDA_PYSPARK = {"pyarrow": "pyarrow>=0.8.0", "pyspark": "pyspark==2.3.1"}
|
||||
|
||||
CONDA_GPU = {
|
||||
'numba': 'numba>=0.38.1',
|
||||
'tensorflow': 'tensorflow-gpu==1.12.0',
|
||||
}
|
||||
CONDA_GPU = {"numba": "numba>=0.38.1", "pytorch": "pytorch>=1.0.0", "tensorflow": "tensorflow-gpu==1.12.0"}
|
||||
|
||||
PIP_BASE = {
|
||||
'azureml-sdk[notebooks,contrib]': 'azureml-sdk[notebooks,contrib]>=1.0.8',
|
||||
'azure-storage': 'azure-storage>=0.36.0',
|
||||
'black': 'black>=18.6b4',
|
||||
'dataclasses': 'dataclasses>=0.6',
|
||||
'hyperopt': 'hyperopt==0.1.1',
|
||||
'idna': 'idna==2.7',
|
||||
'memory-profiler': 'memory-profiler>=0.54.0',
|
||||
'nvidia-ml-py3': 'nvidia-ml-py3>=7.352.0',
|
||||
'papermill': 'papermill>=0.15.0',
|
||||
'pydocumentdb': 'pydocumentdb>=2.3.3',
|
||||
"azureml-sdk[notebooks,contrib]": "azureml-sdk[notebooks,contrib]==1.0.10",
|
||||
"azure-storage": "azure-storage>=0.36.0",
|
||||
"black": "black>=18.6b4",
|
||||
"dataclasses": "dataclasses>=0.6",
|
||||
"hyperopt": "hyperopt==0.1.1",
|
||||
"idna": "idna==2.7",
|
||||
"memory-profiler": "memory-profiler>=0.54.0",
|
||||
"nvidia-ml-py3": "nvidia-ml-py3>=7.352.0",
|
||||
"papermill": "papermill>=0.15.0",
|
||||
"pydocumentdb": "pydocumentdb>=2.3.3",
|
||||
}
|
||||
|
||||
PIP_PYSPARK = {}
|
||||
PIP_GPU = {}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='This script generates a conda file for different environments. Plain python is the default, flags can be used to add packages needed to support pyspark and gpu functionality')
|
||||
parser.add_argument('--name', help='specify name of conda environment')
|
||||
parser.add_argument('--gpu', action="store_true", help='include packages for gpu support')
|
||||
parser.add_argument('--pyspark', action="store_true", help='include packages for pyspark support')
|
||||
parser.add_argument('--pyspark-version', help='provide specific version of pyspark to use')
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description=textwrap.dedent(
|
||||
"""
|
||||
This script generates a conda file for different environments.
|
||||
Plain python is the default, but flags can be used to support PySpark and GPU functionality"""
|
||||
),
|
||||
epilog=HELP_MSG,
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
parser.add_argument("--name", help="specify name of conda environment")
|
||||
parser.add_argument(
|
||||
"--gpu", action="store_true", help="include packages for GPU support"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pyspark", action="store_true", help="include packages for PySpark support"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pyspark-version", help="provide specific version of PySpark to use"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# check pyspark version
|
||||
if args.pyspark_version is not None:
|
||||
args.pyspark = True
|
||||
pyspark_version_info = args.pyspark_version.split('.')
|
||||
if len(pyspark_version_info) != 3 or any([not x.isdigit() for x in pyspark_version_info]):
|
||||
raise TypeError('Pyspark version input must be valid numeric format (e.g. --pyspark-version=2.3.1)')
|
||||
pyspark_version_info = args.pyspark_version.split(".")
|
||||
if len(pyspark_version_info) != 3 or any(
|
||||
[not x.isdigit() for x in pyspark_version_info]
|
||||
):
|
||||
raise TypeError(
|
||||
"PySpark version input must be valid numeric format (e.g. --pyspark-version=2.3.1)"
|
||||
)
|
||||
else:
|
||||
args.pyspark_version = "2.3.1"
|
||||
|
||||
# set name for environment and output yaml file
|
||||
conda_env = 'reco_base'
|
||||
conda_file = 'conda_base.yaml'
|
||||
conda_env = "reco_base"
|
||||
if args.gpu and args.pyspark:
|
||||
conda_env = 'reco_full'
|
||||
conda_file = 'conda_full.yaml'
|
||||
conda_env = "reco_full"
|
||||
elif args.gpu:
|
||||
conda_env = 'reco_gpu'
|
||||
conda_file = 'conda_gpu.yaml'
|
||||
conda_env = "reco_gpu"
|
||||
elif args.pyspark:
|
||||
conda_env = 'reco_pyspark'
|
||||
conda_file = 'conda_pyspark.yaml'
|
||||
conda_env = "reco_pyspark"
|
||||
|
||||
# overwrite environment name with user input
|
||||
if args.name is not None:
|
||||
|
@ -109,34 +128,27 @@ if __name__ == '__main__':
|
|||
pip_packages = PIP_BASE
|
||||
if args.pyspark:
|
||||
conda_packages.update(CONDA_PYSPARK)
|
||||
conda_packages['pyspark'] = 'pyspark=={}'.format(args.pyspark_version)
|
||||
conda_packages["pyspark"] = "pyspark=={}".format(args.pyspark_version)
|
||||
pip_packages.update(PIP_PYSPARK)
|
||||
if args.gpu:
|
||||
conda_packages.update(CONDA_GPU)
|
||||
pip_packages.update(PIP_GPU)
|
||||
|
||||
# write out yaml file
|
||||
with open(conda_file, 'w') as f:
|
||||
f.write('name: {}\n'.format(conda_env))
|
||||
f.write('channels:\n')
|
||||
conda_file = "{}.yaml".format(conda_env)
|
||||
with open(conda_file, "w") as f:
|
||||
for line in HELP_MSG.format(conda_env=conda_env).split("\n"):
|
||||
f.write("# {}\n".format(line))
|
||||
f.write("name: {}\n".format(conda_env))
|
||||
f.write("channels:\n")
|
||||
for channel in CHANNELS:
|
||||
f.write('- {}\n'.format(channel))
|
||||
f.write('dependencies:\n')
|
||||
f.write("- {}\n".format(channel))
|
||||
f.write("dependencies:\n")
|
||||
for conda_package in conda_packages.values():
|
||||
f.write('- {}\n'.format(conda_package))
|
||||
f.write('- pip:\n')
|
||||
f.write("- {}\n".format(conda_package))
|
||||
f.write("- pip:\n")
|
||||
for pip_package in pip_packages.values():
|
||||
f.write(' - {}\n'.format(pip_package))
|
||||
f.write(" - {}\n".format(pip_package))
|
||||
|
||||
print("""Generated conda file: {conda_file}
|
||||
|
||||
To create the conda environment:
|
||||
$ conda env create -f {conda_file}
|
||||
|
||||
To update the conda environment:
|
||||
$ conda env update -f {conda_file}
|
||||
|
||||
To register the conda environment in Jupyter:
|
||||
$ conda activate {conda_env}
|
||||
$ python -m ipykernel install --user --name {conda_env} --display-name "Python ({conda_env})"
|
||||
""".format(conda_env=conda_env, conda_file=conda_file))
|
||||
print("Generated conda file: {}".format(conda_file))
|
||||
print(HELP_MSG.format(conda_env=conda_env))
|
||||
|
|
|
@ -1,136 +0,0 @@
|
|||
#!/bin/bash
|
||||
# This script generates a conda file for python, pyspark, gpu or
|
||||
# all environments.
|
||||
# For generating a conda file for running only python code:
|
||||
# $ sh generate_conda_file.sh
|
||||
# For generating a conda file for running python gpu:
|
||||
# $ sh generate_conda_file.sh --gpu
|
||||
# For generating a conda file for running pyspark:
|
||||
# $ sh generate_conda_file.sh --pyspark
|
||||
# For generating a conda file for running python gpu and pyspark:
|
||||
# $ sh generate_conda_file.sh --gpu --pyspark
|
||||
# For generating a conda file for running python gpu and pyspark with a particular version:
|
||||
# $ sh generate_conda_file.sh --gpu --pyspark-version 2.4.0
|
||||
#
|
||||
|
||||
# first check if conda is installed
|
||||
CONDA_BINARY=$(which conda)
|
||||
if [ ! -x "$CONDA_BINARY" ] ; then
|
||||
echo "No conda found!! Please see the SETUP.md file for installation prerequisites."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
||||
# File which contains conda configuration virtual environment name
|
||||
CONDA_FILE="conda_bare.yaml"
|
||||
|
||||
# default CPU-only no-pySpark versions of conda packages.
|
||||
tensorflow="tensorflow"
|
||||
gpu="#"
|
||||
pyspark="#"
|
||||
|
||||
# flags to detect if both CPU and GPU are specified
|
||||
gpu_flag=false
|
||||
pyspark_flag=false
|
||||
|
||||
# default version of pyspark if it is installed
|
||||
pyspark_version=2.3.1
|
||||
|
||||
while [ ! $# -eq 0 ]
|
||||
do
|
||||
case "$1" in
|
||||
--help)
|
||||
echo "Please specify --gpu to install with GPU-support and"
|
||||
echo "--pyspark to install with pySpark support"
|
||||
echo "--pyspark-version X.Y.Z to install version X.Y.Z of pySpark (default: $pyspark_version)"
|
||||
exit
|
||||
;;
|
||||
--gpu)
|
||||
gpu=""
|
||||
tensorflow="tensorflow-gpu"
|
||||
CONDA_FILE="conda_gpu.yaml"
|
||||
gpu_flag=true
|
||||
;;
|
||||
--pyspark)
|
||||
pyspark=""
|
||||
CONDA_FILE="conda_pyspark.yaml"
|
||||
pyspark_flag=true
|
||||
;;
|
||||
--pyspark-version)
|
||||
pyspark=""
|
||||
CONDA_FILE="conda_pyspark.yaml"
|
||||
pyspark_flag=true
|
||||
pyspark_version=$2
|
||||
## update to use with sh
|
||||
good_version=$(echo $pyspark_version | awk '$0 ~ "^[0-9]+([.][0-9]+){2}$"{print 1}')
|
||||
if ! [ "$good_version" = "1" ]; then
|
||||
echo "Inappropriate version of pyspark passed:" $pyspark_version
|
||||
echo "Version must be of format X.Y.Z"
|
||||
exit 1
|
||||
fi
|
||||
shift
|
||||
;;
|
||||
*)
|
||||
echo $"Usage: $0 invalid argument $1 please run with --help for more information."
|
||||
exit 1
|
||||
esac
|
||||
shift
|
||||
done
|
||||
|
||||
if [ "$pyspark_flag" = true ] && [ "$gpu_flag" = true ]; then
|
||||
CONDA_FILE="conda_full.yaml"
|
||||
fi
|
||||
|
||||
# Write conda file with libraries
|
||||
/bin/cat <<EOM >${CONDA_FILE}
|
||||
# To create the conda environment:
|
||||
# $ conda env create -n my_env_name -f conda.yml
|
||||
#
|
||||
# To update the conda environment:
|
||||
# $ conda env update -n my_env_name -f conda.yaml
|
||||
#
|
||||
# To register the conda environment in Jupyter:
|
||||
# $ python -m ipykernel install --user --name my_env_name --display-name "Python (my_env_name)"
|
||||
#
|
||||
|
||||
channels:
|
||||
- conda-forge
|
||||
- pytorch
|
||||
- fastai
|
||||
- defaults
|
||||
dependencies:
|
||||
- python==3.6.8
|
||||
- numpy>=1.13.3
|
||||
- dask>=0.17.1
|
||||
${pyspark}- pyspark==${pyspark_version}
|
||||
- pymongo>=3.6.1
|
||||
- ipykernel>=4.6.1
|
||||
- ${tensorflow}==1.12.0
|
||||
- scikit-surprise>=1.0.6
|
||||
- scikit-learn==0.19.1
|
||||
- jupyter>=1.0.0
|
||||
- fastparquet>=0.1.6
|
||||
${pyspark}- pyarrow>=0.8.0
|
||||
- fastai>=1.0.40
|
||||
- pip:
|
||||
- pandas>=0.23.4
|
||||
- hyperopt==0.1.1
|
||||
- idna==2.7
|
||||
- scipy>=1.0.0
|
||||
- azure-storage>=0.36.0
|
||||
- matplotlib>=2.2.2
|
||||
- seaborn>=0.8.1
|
||||
- pytest>=3.6.4
|
||||
- papermill>=0.15.0
|
||||
- black>=18.6b4
|
||||
- memory-profiler>=0.54.0
|
||||
- azureml-sdk[notebooks,contrib]>=1.0.8
|
||||
${gpu} - numba>=0.38.1
|
||||
- gitpython>=2.1.8
|
||||
- pydocumentdb>=2.3.3
|
||||
- nvidia-ml-py3>=7.352.0
|
||||
- dataclasses>=0.6
|
||||
EOM
|
||||
|
||||
echo "Conda file generated: " $CONDA_FILE
|
||||
|
|
@ -7,9 +7,7 @@ import pytest
|
|||
from reco_utils.common.gpu_utils import get_number_gpus
|
||||
from tests.notebooks_common import OUTPUT_NOTEBOOK, KERNEL_NAME
|
||||
|
||||
|
||||
TOL = 0.05
|
||||
TOL2 = 0.5
|
||||
TOL = 0.5
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
|
@ -62,8 +60,8 @@ def test_ncf_integration(notebooks, size, epochs, expected_values):
|
|||
10,
|
||||
512,
|
||||
{
|
||||
"map": 0.045746,
|
||||
"ndcg": 0.3739307,
|
||||
"map": 0.045746,
|
||||
"ndcg": 0.3739307,
|
||||
"precision": 0.183987,
|
||||
"recall": 0.105546,
|
||||
"map2": 0.049723,
|
||||
|
@ -89,7 +87,7 @@ def test_ncf_deep_dive_integration(
|
|||
results = pm.read_notebook(OUTPUT_NOTEBOOK).dataframe.set_index("name")["value"]
|
||||
|
||||
for key, value in expected_values.items():
|
||||
assert results[key] == pytest.approx(value, rel=TOL*5)
|
||||
assert results[key] == pytest.approx(value, rel=TOL)
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
|
@ -172,6 +170,6 @@ def test_wide_deep(notebooks, size, epochs, expected_values):
|
|||
results = pm.read_notebook(OUTPUT_NOTEBOOK).dataframe.set_index("name")["value"]
|
||||
|
||||
for key, value in expected_values.items():
|
||||
assert results[key] == pytest.approx(value, rel=TOL2)
|
||||
assert results[key] == pytest.approx(value, rel=TOL)
|
||||
|
||||
shutil.rmtree(MODEL_DIR, ignore_errors=True)
|
||||
|
|
|
@ -13,8 +13,24 @@ TOL = 0.05
|
|||
@pytest.mark.parametrize(
|
||||
"size, expected_values",
|
||||
[
|
||||
("1m", {"map": 0.064012679, "ndcg": 0.308012195, "precision": 0.277214771, "recall": 0.109291553}),
|
||||
#("10m", {"map": 0.101402403, "ndcg": 0.321072689, "precision": 0.275765514, "recall": 0.156483292}) skipping for now, investigating this on issue #465
|
||||
(
|
||||
"1m",
|
||||
{
|
||||
"map": 0.064012679,
|
||||
"ndcg": 0.308012195,
|
||||
"precision": 0.277214771,
|
||||
"recall": 0.109291553,
|
||||
},
|
||||
),
|
||||
(
|
||||
"10m",
|
||||
{
|
||||
"map": 0.101402,
|
||||
"ndcg": 0.321073,
|
||||
"precision": 0.275766,
|
||||
"recall": 0.156483
|
||||
}
|
||||
)
|
||||
],
|
||||
)
|
||||
def test_sar_single_node_integration(notebooks, size, expected_values):
|
||||
|
@ -35,8 +51,16 @@ def test_sar_single_node_integration(notebooks, size, expected_values):
|
|||
@pytest.mark.parametrize(
|
||||
"size, expected_values",
|
||||
[
|
||||
("1m", {"map": 0.033914, "ndcg": 0.231570, "precision": 0.211923, "recall": 0.064663}),
|
||||
#("10m", {"map": , "ndcg": , "precision": , "recall": }), # OOM on test machine
|
||||
(
|
||||
"1m",
|
||||
{
|
||||
"map": 0.033914,
|
||||
"ndcg": 0.231570,
|
||||
"precision": 0.211923,
|
||||
"recall": 0.064663,
|
||||
},
|
||||
),
|
||||
# ("10m", {"map": , "ndcg": , "precision": , "recall": }), # OOM on test machine
|
||||
],
|
||||
)
|
||||
def test_baseline_deep_dive_integration(notebooks, size, expected_values):
|
||||
|
@ -58,14 +82,19 @@ def test_baseline_deep_dive_integration(notebooks, size, expected_values):
|
|||
@pytest.mark.parametrize(
|
||||
"size, expected_values",
|
||||
[
|
||||
("1m", dict(rmse=0.89,
|
||||
mae=0.70,
|
||||
rsquared=0.36,
|
||||
exp_var=0.36,
|
||||
map=0.011,
|
||||
ndcg=0.10,
|
||||
precision=0.093,
|
||||
recall=0.025)),
|
||||
(
|
||||
"1m",
|
||||
dict(
|
||||
rmse=0.89,
|
||||
mae=0.70,
|
||||
rsquared=0.36,
|
||||
exp_var=0.36,
|
||||
map=0.011,
|
||||
ndcg=0.10,
|
||||
precision=0.093,
|
||||
recall=0.025,
|
||||
),
|
||||
),
|
||||
# 10m works but takes too long
|
||||
],
|
||||
)
|
||||
|
@ -87,14 +116,19 @@ def test_surprise_svd_integration(notebooks, size, expected_values):
|
|||
@pytest.mark.parametrize(
|
||||
"size, expected_values",
|
||||
[
|
||||
("1m", dict(rmse=0.9555,
|
||||
mae=0.68493,
|
||||
rsquared=0.26547,
|
||||
exp_var=0.26615,
|
||||
map=0.50635,
|
||||
ndcg=0.99966,
|
||||
precision=0.92684,
|
||||
recall=0.50635)),
|
||||
(
|
||||
"1m",
|
||||
dict(
|
||||
rmse=0.9555,
|
||||
mae=0.68493,
|
||||
rsquared=0.26547,
|
||||
exp_var=0.26615,
|
||||
map=0.50635,
|
||||
ndcg=0.99966,
|
||||
precision=0.92684,
|
||||
recall=0.50635,
|
||||
),
|
||||
)
|
||||
],
|
||||
)
|
||||
def test_vw_deep_dive_integration(notebooks, size, expected_values):
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from reco_utils.dataset.numpy_splitters import numpy_stratified_split
|
||||
from reco_utils.dataset.python_splitters import numpy_stratified_split
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
|
|
|
@ -4,7 +4,10 @@
|
|||
import pytest
|
||||
import os
|
||||
import papermill as pm
|
||||
from reco_utils.recommender.deeprec.deeprec_utils import download_deeprec_resources, prepare_hparams
|
||||
from reco_utils.recommender.deeprec.deeprec_utils import (
|
||||
download_deeprec_resources,
|
||||
prepare_hparams,
|
||||
)
|
||||
from reco_utils.recommender.deeprec.models.base_model import BaseModel
|
||||
from reco_utils.recommender.deeprec.models.xDeepFM import XDeepFMModel
|
||||
from reco_utils.recommender.deeprec.models.dkn import DKN
|
||||
|
@ -21,13 +24,17 @@ def resource_path():
|
|||
@pytest.mark.gpu
|
||||
@pytest.mark.deeprec
|
||||
def test_model_xdeepfm(resource_path):
|
||||
data_path = os.path.join(resource_path, '../resources/deeprec/xdeepfm')
|
||||
yaml_file = os.path.join(data_path, r'xDeepFM.yaml')
|
||||
data_file = os.path.join(data_path, r'sample_FFM_data.txt')
|
||||
output_file = os.path.join(data_path, r'output.txt')
|
||||
data_path = os.path.join(resource_path, "..", "resources", "deeprec", "xdeepfm")
|
||||
yaml_file = os.path.join(data_path, "xDeepFM.yaml")
|
||||
data_file = os.path.join(data_path, "sample_FFM_data.txt")
|
||||
output_file = os.path.join(data_path, "output.txt")
|
||||
|
||||
if not os.path.exists(yaml_file):
|
||||
download_deeprec_resources(r'https://recodatasets.blob.core.windows.net/deeprec/', data_path, 'xdeepfmresources.zip')
|
||||
download_deeprec_resources(
|
||||
"https://recodatasets.blob.core.windows.net/deeprec/",
|
||||
data_path,
|
||||
"xdeepfmresources.zip",
|
||||
)
|
||||
|
||||
hparams = prepare_hparams(yaml_file, learning_rate=0.01)
|
||||
assert hparams is not None
|
||||
|
@ -36,30 +43,38 @@ def test_model_xdeepfm(resource_path):
|
|||
model = XDeepFMModel(hparams, input_creator)
|
||||
|
||||
assert model.run_eval(data_file) is not None
|
||||
assert isinstance(model.fit(data_file,data_file), BaseModel)
|
||||
assert isinstance(model.fit(data_file, data_file), BaseModel)
|
||||
assert model.predict(data_file, output_file) is not None
|
||||
|
||||
|
||||
|
||||
@pytest.mark.smoke
|
||||
@pytest.mark.gpu
|
||||
@pytest.mark.deeprec
|
||||
def test_model_dkn(resource_path):
|
||||
data_path = os.path.join(resource_path, '../resources/deeprec/dkn')
|
||||
yaml_file = os.path.join(data_path, r'dkn.yaml')
|
||||
train_file = os.path.join(data_path, r'final_test_with_entity.txt')
|
||||
valid_file = os.path.join(data_path, r'final_test_with_entity.txt')
|
||||
wordEmb_file = os.path.join(data_path, r'word_embeddings_100.npy')
|
||||
entityEmb_file = os.path.join(data_path, r'TransE_entity2vec_100.npy')
|
||||
data_path = os.path.join(resource_path, "..", "resources", "deeprec", "dkn")
|
||||
yaml_file = os.path.join(data_path, "dkn.yaml")
|
||||
train_file = os.path.join(data_path, "final_test_with_entity.txt")
|
||||
valid_file = os.path.join(data_path, "final_test_with_entity.txt")
|
||||
wordEmb_file = os.path.join(data_path, "word_embeddings_100.npy")
|
||||
entityEmb_file = os.path.join(data_path, "TransE_entity2vec_100.npy")
|
||||
|
||||
if not os.path.exists(yaml_file):
|
||||
download_deeprec_resources(r'https://recodatasets.blob.core.windows.net/deeprec/', data_path, 'dknresources.zip')
|
||||
download_deeprec_resources(
|
||||
"https://recodatasets.blob.core.windows.net/deeprec/",
|
||||
data_path,
|
||||
"dknresources.zip",
|
||||
)
|
||||
|
||||
hparams = prepare_hparams(yaml_file, wordEmb_file=wordEmb_file,
|
||||
entityEmb_file=entityEmb_file, epochs=1, learning_rate=0.0001)
|
||||
hparams = prepare_hparams(
|
||||
yaml_file,
|
||||
wordEmb_file=wordEmb_file,
|
||||
entityEmb_file=entityEmb_file,
|
||||
epochs=1,
|
||||
learning_rate=0.0001,
|
||||
)
|
||||
input_creator = DKNTextIterator
|
||||
model = DKN(hparams, input_creator)
|
||||
|
||||
assert(isinstance(model.fit(train_file, valid_file), BaseModel))
|
||||
assert isinstance(model.fit(train_file, valid_file), BaseModel)
|
||||
assert model.run_eval(valid_file) is not None
|
||||
|
||||
|
||||
|
|
|
@ -1,136 +0,0 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from reco_utils.dataset.numpy_splitters import numpy_stratified_split
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def test_specs():
|
||||
return {
|
||||
"number_of_items": 50,
|
||||
"number_of_users": 20,
|
||||
"seed": 123,
|
||||
"ratio": 0.6,
|
||||
"tolerance": 0.01,
|
||||
"fluctuation": 0.02,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def python_int_dataset(test_specs):
|
||||
# fix the the random seed
|
||||
np.random.seed(test_specs["seed"])
|
||||
|
||||
# generates the user/item affinity matrix. Ratings are from 1 to 5, with 0s denoting unrated items
|
||||
return np.random.randint(
|
||||
low=0,
|
||||
high=6,
|
||||
size=(test_specs["number_of_users"], test_specs["number_of_items"]),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def python_float_dataset(test_specs):
|
||||
# fix the the random seed
|
||||
np.random.seed(test_specs["seed"])
|
||||
|
||||
# generates the user/item affinity matrix. Ratings are from 1 to 5, with 0s denoting unrated items
|
||||
return np.random.random(
|
||||
size=(test_specs["number_of_users"], test_specs["number_of_items"])
|
||||
) * 5
|
||||
|
||||
|
||||
|
||||
def test_int_numpy_stratified_splitter(test_specs, python_int_dataset):
|
||||
# generate a syntetic dataset
|
||||
X = python_int_dataset
|
||||
|
||||
# the splitter returns (in order): train and test user/affinity matrices, train and test datafarmes and user/items to matrix maps
|
||||
Xtr, Xtst = numpy_stratified_split(
|
||||
X, ratio=test_specs["ratio"], seed=test_specs["seed"]
|
||||
)
|
||||
|
||||
# check that the generated matrices have the correct dimensions
|
||||
assert (Xtr.shape[0] == X.shape[0]) & (Xtr.shape[1] == X.shape[1])
|
||||
assert (Xtst.shape[0] == X.shape[0]) & (Xtst.shape[1] == X.shape[1])
|
||||
|
||||
X_rated = np.sum(X != 0, axis=1) # number of total rated items per user
|
||||
Xtr_rated = np.sum(Xtr != 0, axis=1) # number of rated items in the train set
|
||||
Xtst_rated = np.sum(Xtst != 0, axis=1) # number of rated items in the test set
|
||||
|
||||
# global split: check that the all dataset is split in the correct ratio
|
||||
assert Xtr_rated.sum() / (X_rated.sum()) == pytest.approx(
|
||||
test_specs["ratio"], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
assert Xtst_rated.sum() / (X_rated.sum()) == pytest.approx(
|
||||
1 - test_specs["ratio"], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
# This implementation of the stratified splitter performs a random split at the single user level. Here we check
|
||||
# that also this more stringent condition is verified. Note that user to user fluctuations in the split ratio
|
||||
# are stronger than for the entire dataset due to the random nature of the per user splitting.
|
||||
# For this reason we allow a slightly bigger tollerance, as specified in the test_specs()
|
||||
|
||||
assert (
|
||||
(Xtr_rated / X_rated <= test_specs["ratio"] + test_specs["fluctuation"]).all()
|
||||
& (Xtr_rated / X_rated >= test_specs["ratio"] - test_specs["fluctuation"]).all()
|
||||
)
|
||||
|
||||
assert (
|
||||
(
|
||||
Xtst_rated / X_rated
|
||||
<= (1 - test_specs["ratio"]) + test_specs["fluctuation"]
|
||||
).all()
|
||||
& (
|
||||
Xtst_rated / X_rated
|
||||
>= (1 - test_specs["ratio"]) - test_specs["fluctuation"]
|
||||
).all()
|
||||
)
|
||||
|
||||
|
||||
def test_float_numpy_stratified_splitter(test_specs, python_float_dataset):
|
||||
# generate a syntetic dataset
|
||||
X = python_float_dataset
|
||||
|
||||
# the splitter returns (in order): train and test user/affinity matrices, train and test datafarmes and user/items to matrix maps
|
||||
Xtr, Xtst = numpy_stratified_split(
|
||||
X, ratio=test_specs["ratio"], seed=test_specs["seed"]
|
||||
)
|
||||
|
||||
# Tests
|
||||
# check that the generated matrices have the correct dimensions
|
||||
assert (Xtr.shape[0] == X.shape[0]) & (Xtr.shape[1] == X.shape[1])
|
||||
|
||||
assert (Xtst.shape[0] == X.shape[0]) & (Xtst.shape[1] == X.shape[1])
|
||||
|
||||
X_rated = np.sum(X != 0, axis=1) # number of total rated items per user
|
||||
Xtr_rated = np.sum(Xtr != 0, axis=1) # number of rated items in the train set
|
||||
Xtst_rated = np.sum(Xtst != 0, axis=1) # number of rated items in the test set
|
||||
|
||||
# global split: check that the all dataset is split in the correct ratio
|
||||
assert Xtr_rated.sum() / (X_rated.sum()) == pytest.approx(
|
||||
test_specs["ratio"], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
assert Xtst_rated.sum() / (X_rated.sum()) == pytest.approx(
|
||||
1 - test_specs["ratio"], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
# This implementation of the stratified splitter performs a random split at the single user level. Here we check
|
||||
# that also this more stringent condition is verified. Note that user to user fluctuations in the split ratio
|
||||
# are stronger than for the entire dataset due to the random nature of the per user splitting.
|
||||
# For this reason we allow a slightly bigger tollerance, as specified in the test_specs()
|
||||
|
||||
assert Xtr_rated / X_rated == pytest.approx(
|
||||
test_specs["ratio"], rel=test_specs["fluctuation"]
|
||||
)
|
||||
|
||||
assert Xtst_rated / X_rated == pytest.approx(
|
||||
(1 - test_specs["ratio"]), rel=test_specs["fluctuation"]
|
||||
)
|
|
@ -15,6 +15,7 @@ from reco_utils.dataset.python_splitters import (
|
|||
python_chrono_split,
|
||||
python_random_split,
|
||||
python_stratified_split,
|
||||
numpy_stratified_split,
|
||||
)
|
||||
|
||||
from reco_utils.common.constants import (
|
||||
|
@ -35,6 +36,9 @@ def test_specs():
|
|||
"ratios": [0.2, 0.3, 0.5],
|
||||
"split_numbers": [2, 3, 5],
|
||||
"tolerance": 0.01,
|
||||
"number_of_items": 50,
|
||||
"number_of_users": 20,
|
||||
"fluctuation": 0.02,
|
||||
}
|
||||
|
||||
|
||||
|
@ -58,13 +62,13 @@ def python_dataset(test_specs):
|
|||
|
||||
rating = pd.DataFrame(
|
||||
{
|
||||
DEFAULT_USER_COL: np.random.random_integers(
|
||||
DEFAULT_USER_COL: np.random.randint(
|
||||
1, 5, test_specs["number_of_rows"]
|
||||
),
|
||||
DEFAULT_ITEM_COL: np.random.random_integers(
|
||||
DEFAULT_ITEM_COL: np.random.randint(
|
||||
1, 15, test_specs["number_of_rows"]
|
||||
),
|
||||
DEFAULT_RATING_COL: np.random.random_integers(
|
||||
DEFAULT_RATING_COL: np.random.randint(
|
||||
1, 5, test_specs["number_of_rows"]
|
||||
),
|
||||
DEFAULT_TIMESTAMP_COL: random_date_generator(
|
||||
|
@ -169,25 +173,19 @@ def test_chrono_splitter(test_specs, python_dataset):
|
|||
1 - test_specs["ratio"], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
# Test all time stamps in test are later than that in train for all users.
|
||||
# This is for single-split case.
|
||||
all_later = []
|
||||
for user in test_specs["user_ids"]:
|
||||
df_train = splits[0][splits[0][DEFAULT_USER_COL] == user]
|
||||
df_test = splits[1][splits[1][DEFAULT_USER_COL] == user]
|
||||
|
||||
p = product(df_train[DEFAULT_TIMESTAMP_COL], df_test[DEFAULT_TIMESTAMP_COL])
|
||||
user_later = [a <= b for (a, b) in p]
|
||||
|
||||
all_later.append(user_later)
|
||||
assert all(all_later)
|
||||
|
||||
# Test if both contains the same user list. This is because chrono split is stratified.
|
||||
users_train = splits[0][DEFAULT_USER_COL].unique()
|
||||
users_test = splits[1][DEFAULT_USER_COL].unique()
|
||||
|
||||
assert set(users_train) == set(users_test)
|
||||
|
||||
# Test all time stamps in test are later than that in train for all users.
|
||||
# This is for single-split case.
|
||||
max_train_times = splits[0][[DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL]].groupby(DEFAULT_USER_COL).max()
|
||||
min_test_times = splits[1][[DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL]].groupby(DEFAULT_USER_COL).min()
|
||||
check_times = max_train_times.join(min_test_times, lsuffix='_0', rsuffix='_1')
|
||||
assert all((check_times[DEFAULT_TIMESTAMP_COL + '_0'] < check_times[DEFAULT_TIMESTAMP_COL + '_1']).values)
|
||||
|
||||
# Test multi-split case
|
||||
splits = python_chrono_split(
|
||||
python_dataset, ratio=test_specs["ratios"], min_rating=10, filter_by="user"
|
||||
)
|
||||
|
@ -203,21 +201,23 @@ def test_chrono_splitter(test_specs, python_dataset):
|
|||
test_specs["ratios"][2], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
# Test if all splits contain the same user list. This is because chrono split is stratified.
|
||||
users_train = splits[0][DEFAULT_USER_COL].unique()
|
||||
users_test = splits[1][DEFAULT_USER_COL].unique()
|
||||
users_val = splits[2][DEFAULT_USER_COL].unique()
|
||||
assert set(users_train) == set(users_test)
|
||||
assert set(users_train) == set(users_val)
|
||||
|
||||
# Test if timestamps are correctly split. This is for multi-split case.
|
||||
all_later = []
|
||||
for user in test_specs["user_ids"]:
|
||||
df_train = splits[0][splits[0][DEFAULT_USER_COL] == user]
|
||||
df_valid = splits[1][splits[1][DEFAULT_USER_COL] == user]
|
||||
df_test = splits[2][splits[2][DEFAULT_USER_COL] == user]
|
||||
max_train_times = splits[0][[DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL]].groupby(DEFAULT_USER_COL).max()
|
||||
min_test_times = splits[1][[DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL]].groupby(DEFAULT_USER_COL).min()
|
||||
check_times = max_train_times.join(min_test_times, lsuffix='_0', rsuffix='_1')
|
||||
assert all((check_times[DEFAULT_TIMESTAMP_COL + '_0'] < check_times[DEFAULT_TIMESTAMP_COL + '_1']).values)
|
||||
|
||||
p1 = product(df_train[DEFAULT_TIMESTAMP_COL], df_valid[DEFAULT_TIMESTAMP_COL])
|
||||
p2 = product(df_valid[DEFAULT_TIMESTAMP_COL], df_test[DEFAULT_TIMESTAMP_COL])
|
||||
user_later_1 = [a <= b for (a, b) in p1]
|
||||
user_later_2 = [a <= b for (a, b) in p2]
|
||||
|
||||
all_later.append(user_later_1)
|
||||
all_later.append(user_later_2)
|
||||
assert all(all_later)
|
||||
max_test_times = splits[1][[DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL]].groupby(DEFAULT_USER_COL).max()
|
||||
min_val_times = splits[2][[DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL]].groupby(DEFAULT_USER_COL).min()
|
||||
check_times = max_test_times.join(min_val_times, lsuffix='_1', rsuffix='_2')
|
||||
assert all((check_times[DEFAULT_TIMESTAMP_COL + '_1'] < check_times[DEFAULT_TIMESTAMP_COL + '_2']).values)
|
||||
|
||||
|
||||
def test_stratified_splitter(test_specs, python_dataset):
|
||||
|
@ -253,3 +253,117 @@ def test_stratified_splitter(test_specs, python_dataset):
|
|||
test_specs["ratios"][2], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def python_int_dataset(test_specs):
|
||||
# fix the the random seed
|
||||
np.random.seed(test_specs["seed"])
|
||||
|
||||
# generates the user/item affinity matrix. Ratings are from 1 to 5, with 0s denoting unrated items
|
||||
return np.random.randint(
|
||||
low=0,
|
||||
high=6,
|
||||
size=(test_specs["number_of_users"], test_specs["number_of_items"]),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def python_float_dataset(test_specs):
|
||||
# fix the the random seed
|
||||
np.random.seed(test_specs["seed"])
|
||||
|
||||
# generates the user/item affinity matrix. Ratings are from 1 to 5, with 0s denoting unrated items
|
||||
return np.random.random(
|
||||
size=(test_specs["number_of_users"], test_specs["number_of_items"])
|
||||
) * 5
|
||||
|
||||
|
||||
def test_int_numpy_stratified_splitter(test_specs, python_int_dataset):
|
||||
# generate a syntetic dataset
|
||||
X = python_int_dataset
|
||||
|
||||
# the splitter returns (in order): train and test user/affinity matrices, train and test datafarmes and user/items to matrix maps
|
||||
Xtr, Xtst = numpy_stratified_split(
|
||||
X, ratio=test_specs["ratio"], seed=test_specs["seed"]
|
||||
)
|
||||
|
||||
# check that the generated matrices have the correct dimensions
|
||||
assert (Xtr.shape[0] == X.shape[0]) & (Xtr.shape[1] == X.shape[1])
|
||||
assert (Xtst.shape[0] == X.shape[0]) & (Xtst.shape[1] == X.shape[1])
|
||||
|
||||
X_rated = np.sum(X != 0, axis=1) # number of total rated items per user
|
||||
Xtr_rated = np.sum(Xtr != 0, axis=1) # number of rated items in the train set
|
||||
Xtst_rated = np.sum(Xtst != 0, axis=1) # number of rated items in the test set
|
||||
|
||||
# global split: check that the all dataset is split in the correct ratio
|
||||
assert Xtr_rated.sum() / (X_rated.sum()) == pytest.approx(
|
||||
test_specs["ratio"], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
assert Xtst_rated.sum() / (X_rated.sum()) == pytest.approx(
|
||||
1 - test_specs["ratio"], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
# This implementation of the stratified splitter performs a random split at the single user level. Here we check
|
||||
# that also this more stringent condition is verified. Note that user to user fluctuations in the split ratio
|
||||
# are stronger than for the entire dataset due to the random nature of the per user splitting.
|
||||
# For this reason we allow a slightly bigger tollerance, as specified in the test_specs()
|
||||
|
||||
assert (
|
||||
(Xtr_rated / X_rated <= test_specs["ratio"] + test_specs["fluctuation"]).all()
|
||||
& (Xtr_rated / X_rated >= test_specs["ratio"] - test_specs["fluctuation"]).all()
|
||||
)
|
||||
|
||||
assert (
|
||||
(
|
||||
Xtst_rated / X_rated
|
||||
<= (1 - test_specs["ratio"]) + test_specs["fluctuation"]
|
||||
).all()
|
||||
& (
|
||||
Xtst_rated / X_rated
|
||||
>= (1 - test_specs["ratio"]) - test_specs["fluctuation"]
|
||||
).all()
|
||||
)
|
||||
|
||||
|
||||
def test_float_numpy_stratified_splitter(test_specs, python_float_dataset):
|
||||
# generate a syntetic dataset
|
||||
X = python_float_dataset
|
||||
|
||||
# the splitter returns (in order): train and test user/affinity matrices, train and test datafarmes and user/items to matrix maps
|
||||
Xtr, Xtst = numpy_stratified_split(
|
||||
X, ratio=test_specs["ratio"], seed=test_specs["seed"]
|
||||
)
|
||||
|
||||
# Tests
|
||||
# check that the generated matrices have the correct dimensions
|
||||
assert (Xtr.shape[0] == X.shape[0]) & (Xtr.shape[1] == X.shape[1])
|
||||
|
||||
assert (Xtst.shape[0] == X.shape[0]) & (Xtst.shape[1] == X.shape[1])
|
||||
|
||||
X_rated = np.sum(X != 0, axis=1) # number of total rated items per user
|
||||
Xtr_rated = np.sum(Xtr != 0, axis=1) # number of rated items in the train set
|
||||
Xtst_rated = np.sum(Xtst != 0, axis=1) # number of rated items in the test set
|
||||
|
||||
# global split: check that the all dataset is split in the correct ratio
|
||||
assert Xtr_rated.sum() / (X_rated.sum()) == pytest.approx(
|
||||
test_specs["ratio"], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
assert Xtst_rated.sum() / (X_rated.sum()) == pytest.approx(
|
||||
1 - test_specs["ratio"], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
# This implementation of the stratified splitter performs a random split at the single user level. Here we check
|
||||
# that also this more stringent condition is verified. Note that user to user fluctuations in the split ratio
|
||||
# are stronger than for the entire dataset due to the random nature of the per user splitting.
|
||||
# For this reason we allow a slightly bigger tollerance, as specified in the test_specs()
|
||||
|
||||
assert Xtr_rated / X_rated == pytest.approx(
|
||||
test_specs["ratio"], rel=test_specs["fluctuation"]
|
||||
)
|
||||
|
||||
assert Xtst_rated / X_rated == pytest.approx(
|
||||
(1 - test_specs["ratio"]), rel=test_specs["fluctuation"]
|
||||
)
|
||||
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from itertools import product
|
||||
import pytest
|
||||
from reco_utils.dataset.split_utils import min_rating_filter_spark
|
||||
from reco_utils.common.constants import (
|
||||
|
@ -14,6 +13,7 @@ from reco_utils.common.constants import (
|
|||
)
|
||||
|
||||
try:
|
||||
from pyspark.sql import functions as F
|
||||
from pyspark.sql.functions import col
|
||||
from reco_utils.common.spark_utils import start_or_get_spark
|
||||
from reco_utils.dataset.spark_splitters import (
|
||||
|
@ -57,13 +57,13 @@ def python_data(test_specs):
|
|||
|
||||
rating = pd.DataFrame(
|
||||
{
|
||||
DEFAULT_USER_COL: np.random.random_integers(
|
||||
DEFAULT_USER_COL: np.random.randint(
|
||||
1, 5, test_specs["number_of_rows"]
|
||||
),
|
||||
DEFAULT_ITEM_COL: np.random.random_integers(
|
||||
DEFAULT_ITEM_COL: np.random.randint(
|
||||
1, 15, test_specs["number_of_rows"]
|
||||
),
|
||||
DEFAULT_RATING_COL: np.random.random_integers(
|
||||
DEFAULT_RATING_COL: np.random.randint(
|
||||
1, 5, test_specs["number_of_rows"]
|
||||
),
|
||||
DEFAULT_TIMESTAMP_COL: random_date_generator(
|
||||
|
@ -155,16 +155,7 @@ def test_chrono_splitter(test_specs, spark_dataset):
|
|||
|
||||
assert set(users_train) == set(users_test)
|
||||
|
||||
# Test all time stamps in test are later than that in train for all users.
|
||||
all_later = []
|
||||
for user in test_specs["user_ids"]:
|
||||
dfs_train = splits[0][splits[0][DEFAULT_USER_COL] == user]
|
||||
dfs_test = splits[1][splits[1][DEFAULT_USER_COL] == user]
|
||||
|
||||
user_later = _if_later(dfs_train, dfs_test, col_timestamp=DEFAULT_TIMESTAMP_COL)
|
||||
|
||||
all_later.append(user_later)
|
||||
assert all(all_later)
|
||||
assert _if_later(splits[0], splits[1])
|
||||
|
||||
splits = spark_chrono_split(spark_dataset, ratio=test_specs["ratios"])
|
||||
|
||||
|
@ -178,19 +169,8 @@ def test_chrono_splitter(test_specs, spark_dataset):
|
|||
test_specs["ratios"][2], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
# Test if timestamps are correctly split. This is for multi-split case.
|
||||
all_later = []
|
||||
for user in test_specs["user_ids"]:
|
||||
dfs_train = splits[0][splits[0][DEFAULT_USER_COL] == user]
|
||||
dfs_valid = splits[1][splits[1][DEFAULT_USER_COL] == user]
|
||||
dfs_test = splits[2][splits[2][DEFAULT_USER_COL] == user]
|
||||
|
||||
user_later_1 = _if_later(dfs_train, dfs_valid, col_timestamp=DEFAULT_TIMESTAMP_COL)
|
||||
user_later_2 = _if_later(dfs_valid, dfs_test, col_timestamp=DEFAULT_TIMESTAMP_COL)
|
||||
|
||||
all_later.append(user_later_1)
|
||||
all_later.append(user_later_2)
|
||||
assert all(all_later)
|
||||
assert _if_later(splits[0], splits[1])
|
||||
assert _if_later(splits[1], splits[2])
|
||||
|
||||
|
||||
@pytest.mark.spark
|
||||
|
@ -244,8 +224,12 @@ def test_timestamp_splitter(test_specs, spark_dataset):
|
|||
1 - test_specs["ratio"], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
max_split0 = splits[0].agg(F.max(DEFAULT_TIMESTAMP_COL)).first()[0]
|
||||
min_split1 = splits[1].agg(F.min(DEFAULT_TIMESTAMP_COL)).first()[0]
|
||||
assert(max_split0 <= min_split1)
|
||||
|
||||
# Test multi split
|
||||
splits = spark_stratified_split(dfs_rating, ratio=test_specs["ratios"])
|
||||
splits = spark_timestamp_split(dfs_rating, ratio=test_specs["ratios"])
|
||||
|
||||
assert splits[0].count() / test_specs["number_of_rows"] == pytest.approx(
|
||||
test_specs["ratios"][0], test_specs["tolerance"]
|
||||
|
@ -257,36 +241,34 @@ def test_timestamp_splitter(test_specs, spark_dataset):
|
|||
test_specs["ratios"][2], test_specs["tolerance"]
|
||||
)
|
||||
|
||||
dfs_train = splits[0]
|
||||
dfs_valid = splits[1]
|
||||
dfs_test = splits[2]
|
||||
max_split0 = splits[0].agg(F.max(DEFAULT_TIMESTAMP_COL)).first()[0]
|
||||
min_split1 = splits[1].agg(F.min(DEFAULT_TIMESTAMP_COL)).first()[0]
|
||||
assert(max_split0 <= min_split1)
|
||||
|
||||
# if valid is later than train.
|
||||
all_later_1 = _if_later(dfs_train, dfs_valid, col_timestamp=DEFAULT_TIMESTAMP_COL)
|
||||
assert all_later_1
|
||||
|
||||
# if test is later than valid.
|
||||
all_later_2 = _if_later(dfs_valid, dfs_test, col_timestamp=DEFAULT_TIMESTAMP_COL)
|
||||
assert all_later_2
|
||||
max_split1 = splits[1].agg(F.max(DEFAULT_TIMESTAMP_COL)).first()[0]
|
||||
min_split2 = splits[2].agg(F.min(DEFAULT_TIMESTAMP_COL)).first()[0]
|
||||
assert(max_split1 <= min_split2)
|
||||
|
||||
|
||||
def _if_later(data1, data2, col_timestamp=DEFAULT_TIMESTAMP_COL):
|
||||
'''Helper function to test if records in data1 are later than that in data2.
|
||||
def _if_later(data1, data2):
|
||||
"""Helper function to test if records in data1 are earlier than that in data2.
|
||||
Returns:
|
||||
bool: True or False indicating if data1 is later than data2.
|
||||
'''
|
||||
p = product(
|
||||
[
|
||||
x[col_timestamp]
|
||||
for x in data1.select(col_timestamp).collect()
|
||||
],
|
||||
[
|
||||
x[col_timestamp]
|
||||
for x in data2.select(col_timestamp).collect()
|
||||
],
|
||||
)
|
||||
bool: True or False indicating if data1 is earlier than data2.
|
||||
"""
|
||||
x = (data1.select(DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL)
|
||||
.groupBy(DEFAULT_USER_COL)
|
||||
.agg(F.max(DEFAULT_TIMESTAMP_COL).cast('long').alias('max'))
|
||||
.collect())
|
||||
max_times = {row[DEFAULT_USER_COL]: row['max'] for row in x}
|
||||
|
||||
if_late = [a <= b for (a, b) in p]
|
||||
y = (data2.select(DEFAULT_USER_COL, DEFAULT_TIMESTAMP_COL)
|
||||
.groupBy(DEFAULT_USER_COL)
|
||||
.agg(F.min(DEFAULT_TIMESTAMP_COL).cast('long').alias('min'))
|
||||
.collect())
|
||||
min_times = {row[DEFAULT_USER_COL]: row['min'] for row in y}
|
||||
|
||||
return if_late
|
||||
result = True
|
||||
for user, max_time in max_times.items():
|
||||
result = result and min_times[user] >= max_time
|
||||
|
||||
return result
|
||||
|
|
Загрузка…
Ссылка в новой задаче