From 73e07c3fc3ae373a0241f58e4e4fe1a479abc4df Mon Sep 17 00:00:00 2001 From: Markus Cozowicz Date: Mon, 19 Nov 2018 22:00:18 +0100 Subject: [PATCH] fixed a few bugs added original sar single node unit tests --- python/pysarplus/SARPlus.py | 48 ++++-- python/tests/test_pyspark_sar.py | 263 +++++++++++++++++++++++++++++-- 2 files changed, 287 insertions(+), 24 deletions(-) diff --git a/python/pysarplus/SARPlus.py b/python/pysarplus/SARPlus.py index 95149f9..1f6756e 100644 --- a/python/pysarplus/SARPlus.py +++ b/python/pysarplus/SARPlus.py @@ -49,7 +49,7 @@ class SARPlus: self, df, similarity_type="jaccard", - time_decay_coefficient=False, + time_decay_coefficient=30, time_now=None, timedecay_formula=False, threshold=1, @@ -64,6 +64,8 @@ class SARPlus: # threshold - items below this number get set to zero in coocurrence counts assert threshold > 0 + df.createOrReplaceTempView("{prefix}df_train_input".format(**self.header)) + if timedecay_formula: # WARNING: previously we would take the last value in training dataframe and set it # as a matrix U element @@ -76,10 +78,9 @@ class SARPlus: # Time T parameter is in days and input time is in seconds # so we do dt/60/(T*24*60)=dt/(T*24*3600) # the folling is the query which we want to run - df.createOrReplaceTempView("{prefix}df_train_input".format(**self.header)) query = self.f( - """ + """ SELECT {col_user}, {col_item}, SUM({col_rating} * EXP(-log(2) * (latest_timestamp - CAST({col_timestamp} AS long)) / ({time_decay_coefficient} * 3600 * 24))) as {col_rating} @@ -87,13 +88,32 @@ class SARPlus: (SELECT CAST(MAX({col_timestamp}) AS long) latest_timestamp FROM {prefix}df_train_input) GROUP BY {col_user}, {col_item} CLUSTER BY {col_user} - """, + """, time_now=time_now, time_decay_coefficient=time_decay_coefficient, ) # replace with timedecayed version df = self.spark.sql(query) + else: + # since SQL is case insensitive, this check needs to be performed similar + if self.header['col_timestamp'].lower() in [s.name.lower() for s in df.schema]: + # we need to de-duplicate items by using the latest item + query = self.f( + """ + SELECT {col_user}, {col_item}, {col_rating} + FROM + ( + SELECT + {col_user}, {col_item}, {col_rating}, + ROW_NUMBER() OVER (PARTITION BY {col_user}, {col_item} ORDER BY {col_timestamp} DESC) latest + FROM {prefix}df_train_input + ) + WHERE latest = 1 + """ + ) + + df = self.spark.sql(query) df.createOrReplaceTempView(self.f("{prefix}df_train")) @@ -141,7 +161,7 @@ class SARPlus: self.item_similarity = self.spark.sql(query) elif similarity_type == SIM_LIFT: query = self.f( - """ + """ SELECT i1, i2, value / (M1.margin * M2.margin) AS value FROM {prefix}item_cooccurrence A INNER JOIN {prefix}item_marginal M1 ON A.i1 = M1.i @@ -163,7 +183,7 @@ class SARPlus: # expand upper triangular to full matrix query = self.f( - """ + """ SELECT i1, i2, value FROM ( @@ -205,7 +225,7 @@ class SARPlus: ) query = self.f( - """ + """ SELECT a.{col_user}, a.{col_item}, CAST(a.{col_rating} AS double) {col_rating} FROM {prefix}df_train a INNER JOIN {prefix}df_test_users b ON a.{col_user} = b.{col_user} DISTRIBUTE BY {col_user} @@ -215,7 +235,7 @@ class SARPlus: return self.spark.sql(query) - def recommend_k_items(self, test, cache_path, top_k=10, remove_seen=True, n_user_prediction_partitions=1000): + def recommend_k_items(self, test, cache_path, top_k=10, remove_seen=True, n_user_prediction_partitions=200): # create item id to continuous index mapping log.info("sarplus.recommend_k_items 1/3: create item index") @@ -239,7 +259,7 @@ class SARPlus: # export similarity matrix for C++ backed UDF log.info("sarplus.recommend_k_items 2/3: prepare similarity matrix") - self.spark.sql(self.f("SELECT * FROM {prefix}item_similarity_mapped ORDER BY i1, i2"))\ + self.spark.sql(self.f("SELECT i1, i2, CAST(value AS DOUBLE) value FROM {prefix}item_similarity_mapped ORDER BY i1, i2"))\ .coalesce(1)\ .write.format("eisber.sarplus").mode("overwrite")\ .save(cache_path_output) @@ -258,7 +278,7 @@ class SARPlus: """)) schema = StructType([ - StructField("userID", IntegerType(), True), # TODO: needs to be the same as col_user + StructField("userID", pred_input.schema[self.header['col_user']].dataType, True), StructField("itemID", IntegerType(), True), StructField("score", FloatType(), True) ]) @@ -292,8 +312,6 @@ class SARPlus: .groupby(self.header['col_user'])\ .apply(sar_predict_udf) - # print(df_preds.show()) - df_preds.createOrReplaceTempView(self.f("{prefix}predictions")) return self.spark.sql(self.f(""" @@ -306,12 +324,14 @@ class SARPlus: """Recommend top K items for all users which are in the test set. Args: - test: indexed test Spark dataframe + test: test Spark dataframe top_k: top n items to return remove_seen: remove items test users have already seen in the past from the recommended set. """ # TODO: remove seen + if remove_seen: + raise ValueError("Not implemented") self.get_user_affinity(test)\ .write.mode("overwrite")\ @@ -339,4 +359,4 @@ class SARPlus: top_k=top_k, ) - return self.spark.sql(query) + return self.spark.sql(query) \ No newline at end of file diff --git a/python/tests/test_pyspark_sar.py b/python/tests/test_pyspark_sar.py index add5227..7c16ebf 100644 --- a/python/tests/test_pyspark_sar.py +++ b/python/tests/test_pyspark_sar.py @@ -1,7 +1,11 @@ +import calendar +import datetime import math +import numpy as np import pandas as pd import pytest import os +from sklearn.model_selection import train_test_split from pyspark.sql import SparkSession @@ -26,8 +30,10 @@ def spark(app_name="Sample", url="local[*]", memory="1G"): .config("spark.jars", os.path.dirname(__file__) + "/../../scala/target/scala-2.11/sarplus_2.11-0.2.4.jar") .config("spark.driver.memory", memory) .config("spark.sql.shuffle.partitions", "1") + .config("spark.default.parallelism", "1") .config("spark.sql.crossJoin.enabled", True) .config("spark.ui.enabled", False) + .config("spark.eventLog.enabled", True) .getOrCreate() ) @@ -121,27 +127,264 @@ def test_pandas(spark, sample_cache): def test_e2e(spark, pandas_dummy_dataset, header): sar = SARPlus(spark, **header) - # TODO: cooccurence is broken df = spark.createDataFrame(pandas_dummy_dataset) sar.fit(df) # assert 4*4 + 32 == sar.item_similarity.count() - print(sar.item_similarity - .toPandas() - .pivot_table(index='i1', columns='i2', values='value')) + # print(sar.item_similarity + # .toPandas() + # .pivot_table(index='i1', columns='i2', values='value')) test_df = spark.createDataFrame(pd.DataFrame({ header['col_user']: [3], header['col_item']: [2] })) - r1 = sar.recommend_k_items_slow(test_df, top_k=3) - print("slow") - print(r1.show()) + r1 = sar.recommend_k_items_slow(test_df, top_k=3, remove_seen=False)\ + .toPandas()\ + .sort_values([header['col_user'], header['col_item']])\ + .reset_index(drop=True) - r2 = sar.recommend_k_items(test_df, "tests/test_e2e_cache", top_k=3, n_user_prediction_partitions=2, remove_seen=False) - print("fast") - print(r2.show()) + r2 = sar.recommend_k_items(test_df, "tests/test_e2e_cache", top_k=3, n_user_prediction_partitions=2, remove_seen=False)\ + .toPandas()\ + .sort_values([header['col_user'], header['col_item']])\ + .reset_index(drop=True) + + assert (r1.iloc[:,:2] == r2.iloc[:,:2]).all().all() + assert np.allclose( + r1.score.values, + r2.score.values, + 1e-3 + ) + +@pytest.fixture(scope="module") +def pandas_dummy(header): + ratings_dict = { + header["col_user"]: [1, 1, 1, 1, 2, 2, 2, 2, 2, 2], + header["col_item"]: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + header["col_rating"]: [1, 2, 3, 4, 5, 1, 2, 3, 4, 5], + } + df = pd.DataFrame(ratings_dict) + return df +@pytest.fixture(scope="module") +def pandas_dummy_timestamp(pandas_dummy, header): + time = 1535133442 + time_series = [time + 20 * i for i in range(10)] + df = pandas_dummy + df[header["col_timestamp"]] = time_series + return df + + +@pytest.fixture(scope="module") +def train_test_dummy_timestamp(pandas_dummy_timestamp): + return train_test_split(pandas_dummy_timestamp, test_size=0.2, random_state=0) + + +@pytest.fixture(scope="module") +def demo_usage_data(header, sar_settings): + # load the data + data = pd.read_csv(sar_settings["FILE_DIR"] + "demoUsage.csv") + data["rating"] = pd.Series([1] * data.shape[0]) + data = data.rename( + columns={ + "userId": header["col_user"], + "productId": header["col_item"], + "rating": header["col_rating"], + "timestamp": header["col_timestamp"], + } + ) + + # convert timestamp + data[header["col_timestamp"]] = data[header["col_timestamp"]].apply( + lambda s: float( + calendar.timegm( + datetime.datetime.strptime(s, "%Y/%m/%dT%H:%M:%S").timetuple() + ) + ) + ) + + return data + + +@pytest.fixture(scope="module") +def demo_usage_data_spark(spark, demo_usage_data, header): + data_local = demo_usage_data[[x[1] for x in header.items()]] + # TODO: install pyArrow in DS VM + # spark.conf.set("spark.sql.execution.arrow.enabled", "true") + data = spark.createDataFrame(data_local) + return data + + +@pytest.fixture(scope="module") +def sar_settings(): + return { + # absolute tolerance parameter for matrix equivalence in SAR tests + "ATOL": 1e-8, + # directory of the current file - used to link unit test data + "FILE_DIR": "http://recodatasets.blob.core.windows.net/sarunittest/", + # user ID used in the test files (they are designed for this user ID, this is part of the test) + "TEST_USER_ID": "0003000098E85347", + } + + +@pytest.mark.parametrize( + "similarity_type, timedecay_formula", [("jaccard", False), ("lift", True)] +) +def test_fit(spark, similarity_type, timedecay_formula, train_test_dummy_timestamp, header): + model = SARPlus(spark, **header) + + trainset, testset = train_test_dummy_timestamp + + df = spark.createDataFrame(trainset) + df.write.mode("overwrite").saveAsTable("trainset") + + df = spark.table("trainset") + + model.fit(df, + timedecay_formula=timedecay_formula, + similarity_type=similarity_type) + + +""" +Main SAR tests are below - load test files which are used for both Scala SAR and Python reference implementations +""" + +# Tests 1-6 +@pytest.mark.parametrize( + "threshold,similarity_type,file", + [ + (1, "cooccurrence", "count"), + (1, "jaccard", "jac"), + (1, "lift", "lift"), + (3, "cooccurrence", "count"), + (3, "jaccard", "jac"), + (3, "lift", "lift"), + ], +) +def test_sar_item_similarity( + spark, threshold, similarity_type, file, demo_usage_data, sar_settings, header +): + + model = SARPlus(spark, **header) + + df = spark.createDataFrame(demo_usage_data) + model.fit(df, + timedecay_formula=False, + time_decay_coefficient=30, + time_now=None, + threshold=threshold, + similarity_type=similarity_type) + + # reference + item_similarity_ref = pd.read_csv(sar_settings["FILE_DIR"] + "sim_" + file + str(threshold) + ".csv") + + item_similarity_ref = pd.melt(item_similarity_ref, + item_similarity_ref.columns[0], + item_similarity_ref.columns[1:], + 'i2', + 'value') + item_similarity_ref.columns = ['i1', 'i2', 'value'] + + item_similarity_ref = item_similarity_ref[item_similarity_ref.value > 0]\ + .sort_values(['i1', 'i2'])\ + .reset_index(drop=True)\ + + # actual + item_similarity = model.item_similarity\ + .toPandas()\ + .sort_values(['i1', 'i2'])\ + .reset_index(drop=True) + + if similarity_type is "cooccurrence": + assert((item_similarity_ref == item_similarity).all().all()) + else: + assert((item_similarity.iloc[:,:1] == item_similarity_ref.iloc[:,:1]).all().all()) + + assert np.allclose( + item_similarity.value.values, + item_similarity_ref.value.values + ) + +# Test 7 +def test_user_affinity(spark, demo_usage_data, sar_settings, header): + time_now = demo_usage_data[header["col_timestamp"]].max() + + model = SARPlus(spark, **header) + + df = spark.createDataFrame(demo_usage_data) + model.fit(df, + timedecay_formula=True, + time_decay_coefficient=30, + time_now=time_now, + similarity_type="cooccurrence") + + user_affinity_ref = pd.read_csv(sar_settings["FILE_DIR"] + "user_aff.csv") + user_affinity_ref = pd.melt(user_affinity_ref, user_affinity_ref.columns[0], user_affinity_ref.columns[1:], 'ItemId', 'Rating') + user_affinity_ref = user_affinity_ref[user_affinity_ref.Rating > 0]\ + .reset_index(drop=True) + + # construct dataframe with test user id we'd like to get the affinity for + df_test = spark.createDataFrame(pd.DataFrame({header['col_user']:[sar_settings["TEST_USER_ID"]]})) + user_affinity = model.get_user_affinity(df_test).toPandas().reset_index(drop=True) + + # verify the that item ids are the same + assert (user_affinity[header['col_item']] == user_affinity_ref.ItemId).all() + + assert np.allclose( + user_affinity_ref[header['col_rating']].values, + user_affinity['Rating'].values, + atol=sar_settings["ATOL"] + ) + + +# Tests 8-10 +@pytest.mark.parametrize( + "threshold,similarity_type,file", + [(3, "cooccurrence", "count"), (3, "jaccard", "jac"), (3, "lift", "lift")], +) +def test_userpred( + spark, threshold, similarity_type, file, header, sar_settings, demo_usage_data +): + time_now = demo_usage_data[header["col_timestamp"]].max() + + test_id = '{0}_{1}_{2}'.format(threshold, similarity_type, file) + + model = SARPlus(spark, **header, table_prefix=test_id) + + df = spark.createDataFrame(demo_usage_data) + model.fit(df, + timedecay_formula=True, + time_decay_coefficient=30, + time_now=time_now, + threshold=threshold, + similarity_type=similarity_type) + + url = (sar_settings["FILE_DIR"] + + "userpred_" + + file + + str(threshold) + + "_userid_only.csv") + + pred_ref = pd.read_csv(url) + pred_ref = pd.wide_to_long(pred_ref, ['rec','score'], 'user', 'idx')\ + .sort_values('score', ascending=False)\ + .reset_index(drop=True) + + # Note: it's important to have a separate cache_path for each run as they're interferring with each other + pred = model.recommend_k_items( + spark.createDataFrame(demo_usage_data[ + demo_usage_data[header["col_user"]] == sar_settings["TEST_USER_ID"] + ]), + cache_path='test_userpred-' + test_id, + top_k=10, + n_user_prediction_partitions=1) + + pred = pred.toPandas()\ + .sort_values('score', ascending=False)\ + .reset_index(drop=True) + + assert (pred.MovieId.values == pred_ref.rec.values).all() + assert np.allclose(pred.score.values, pred_ref.score.values, atol=sar_settings["ATOL"]) \ No newline at end of file