added original sar single node unit tests
This commit is contained in:
Markus Cozowicz 2018-11-19 22:00:18 +01:00
Родитель 67ac2a058d
Коммит 73e07c3fc3
2 изменённых файлов: 287 добавлений и 24 удалений

Просмотреть файл

@ -49,7 +49,7 @@ class SARPlus:
self,
df,
similarity_type="jaccard",
time_decay_coefficient=False,
time_decay_coefficient=30,
time_now=None,
timedecay_formula=False,
threshold=1,
@ -64,6 +64,8 @@ class SARPlus:
# threshold - items below this number get set to zero in coocurrence counts
assert threshold > 0
df.createOrReplaceTempView("{prefix}df_train_input".format(**self.header))
if timedecay_formula:
# WARNING: previously we would take the last value in training dataframe and set it
# as a matrix U element
@ -76,10 +78,9 @@ class SARPlus:
# Time T parameter is in days and input time is in seconds
# so we do dt/60/(T*24*60)=dt/(T*24*3600)
# the folling is the query which we want to run
df.createOrReplaceTempView("{prefix}df_train_input".format(**self.header))
query = self.f(
"""
"""
SELECT
{col_user}, {col_item},
SUM({col_rating} * EXP(-log(2) * (latest_timestamp - CAST({col_timestamp} AS long)) / ({time_decay_coefficient} * 3600 * 24))) as {col_rating}
@ -87,13 +88,32 @@ class SARPlus:
(SELECT CAST(MAX({col_timestamp}) AS long) latest_timestamp FROM {prefix}df_train_input)
GROUP BY {col_user}, {col_item}
CLUSTER BY {col_user}
""",
""",
time_now=time_now,
time_decay_coefficient=time_decay_coefficient,
)
# replace with timedecayed version
df = self.spark.sql(query)
else:
# since SQL is case insensitive, this check needs to be performed similar
if self.header['col_timestamp'].lower() in [s.name.lower() for s in df.schema]:
# we need to de-duplicate items by using the latest item
query = self.f(
"""
SELECT {col_user}, {col_item}, {col_rating}
FROM
(
SELECT
{col_user}, {col_item}, {col_rating},
ROW_NUMBER() OVER (PARTITION BY {col_user}, {col_item} ORDER BY {col_timestamp} DESC) latest
FROM {prefix}df_train_input
)
WHERE latest = 1
"""
)
df = self.spark.sql(query)
df.createOrReplaceTempView(self.f("{prefix}df_train"))
@ -141,7 +161,7 @@ class SARPlus:
self.item_similarity = self.spark.sql(query)
elif similarity_type == SIM_LIFT:
query = self.f(
"""
"""
SELECT i1, i2, value / (M1.margin * M2.margin) AS value
FROM {prefix}item_cooccurrence A
INNER JOIN {prefix}item_marginal M1 ON A.i1 = M1.i
@ -163,7 +183,7 @@ class SARPlus:
# expand upper triangular to full matrix
query = self.f(
"""
"""
SELECT i1, i2, value
FROM
(
@ -205,7 +225,7 @@ class SARPlus:
)
query = self.f(
"""
"""
SELECT a.{col_user}, a.{col_item}, CAST(a.{col_rating} AS double) {col_rating}
FROM {prefix}df_train a INNER JOIN {prefix}df_test_users b ON a.{col_user} = b.{col_user}
DISTRIBUTE BY {col_user}
@ -215,7 +235,7 @@ class SARPlus:
return self.spark.sql(query)
def recommend_k_items(self, test, cache_path, top_k=10, remove_seen=True, n_user_prediction_partitions=1000):
def recommend_k_items(self, test, cache_path, top_k=10, remove_seen=True, n_user_prediction_partitions=200):
# create item id to continuous index mapping
log.info("sarplus.recommend_k_items 1/3: create item index")
@ -239,7 +259,7 @@ class SARPlus:
# export similarity matrix for C++ backed UDF
log.info("sarplus.recommend_k_items 2/3: prepare similarity matrix")
self.spark.sql(self.f("SELECT * FROM {prefix}item_similarity_mapped ORDER BY i1, i2"))\
self.spark.sql(self.f("SELECT i1, i2, CAST(value AS DOUBLE) value FROM {prefix}item_similarity_mapped ORDER BY i1, i2"))\
.coalesce(1)\
.write.format("eisber.sarplus").mode("overwrite")\
.save(cache_path_output)
@ -258,7 +278,7 @@ class SARPlus:
"""))
schema = StructType([
StructField("userID", IntegerType(), True), # TODO: needs to be the same as col_user
StructField("userID", pred_input.schema[self.header['col_user']].dataType, True),
StructField("itemID", IntegerType(), True),
StructField("score", FloatType(), True)
])
@ -292,8 +312,6 @@ class SARPlus:
.groupby(self.header['col_user'])\
.apply(sar_predict_udf)
# print(df_preds.show())
df_preds.createOrReplaceTempView(self.f("{prefix}predictions"))
return self.spark.sql(self.f("""
@ -306,12 +324,14 @@ class SARPlus:
"""Recommend top K items for all users which are in the test set.
Args:
test: indexed test Spark dataframe
test: test Spark dataframe
top_k: top n items to return
remove_seen: remove items test users have already seen in the past from the recommended set.
"""
# TODO: remove seen
if remove_seen:
raise ValueError("Not implemented")
self.get_user_affinity(test)\
.write.mode("overwrite")\
@ -339,4 +359,4 @@ class SARPlus:
top_k=top_k,
)
return self.spark.sql(query)
return self.spark.sql(query)

Просмотреть файл

@ -1,7 +1,11 @@
import calendar
import datetime
import math
import numpy as np
import pandas as pd
import pytest
import os
from sklearn.model_selection import train_test_split
from pyspark.sql import SparkSession
@ -26,8 +30,10 @@ def spark(app_name="Sample", url="local[*]", memory="1G"):
.config("spark.jars", os.path.dirname(__file__) + "/../../scala/target/scala-2.11/sarplus_2.11-0.2.4.jar")
.config("spark.driver.memory", memory)
.config("spark.sql.shuffle.partitions", "1")
.config("spark.default.parallelism", "1")
.config("spark.sql.crossJoin.enabled", True)
.config("spark.ui.enabled", False)
.config("spark.eventLog.enabled", True)
.getOrCreate()
)
@ -121,27 +127,264 @@ def test_pandas(spark, sample_cache):
def test_e2e(spark, pandas_dummy_dataset, header):
sar = SARPlus(spark, **header)
# TODO: cooccurence is broken
df = spark.createDataFrame(pandas_dummy_dataset)
sar.fit(df)
# assert 4*4 + 32 == sar.item_similarity.count()
print(sar.item_similarity
.toPandas()
.pivot_table(index='i1', columns='i2', values='value'))
# print(sar.item_similarity
# .toPandas()
# .pivot_table(index='i1', columns='i2', values='value'))
test_df = spark.createDataFrame(pd.DataFrame({
header['col_user']: [3],
header['col_item']: [2]
}))
r1 = sar.recommend_k_items_slow(test_df, top_k=3)
print("slow")
print(r1.show())
r1 = sar.recommend_k_items_slow(test_df, top_k=3, remove_seen=False)\
.toPandas()\
.sort_values([header['col_user'], header['col_item']])\
.reset_index(drop=True)
r2 = sar.recommend_k_items(test_df, "tests/test_e2e_cache", top_k=3, n_user_prediction_partitions=2, remove_seen=False)
print("fast")
print(r2.show())
r2 = sar.recommend_k_items(test_df, "tests/test_e2e_cache", top_k=3, n_user_prediction_partitions=2, remove_seen=False)\
.toPandas()\
.sort_values([header['col_user'], header['col_item']])\
.reset_index(drop=True)
assert (r1.iloc[:,:2] == r2.iloc[:,:2]).all().all()
assert np.allclose(
r1.score.values,
r2.score.values,
1e-3
)
@pytest.fixture(scope="module")
def pandas_dummy(header):
ratings_dict = {
header["col_user"]: [1, 1, 1, 1, 2, 2, 2, 2, 2, 2],
header["col_item"]: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
header["col_rating"]: [1, 2, 3, 4, 5, 1, 2, 3, 4, 5],
}
df = pd.DataFrame(ratings_dict)
return df
@pytest.fixture(scope="module")
def pandas_dummy_timestamp(pandas_dummy, header):
time = 1535133442
time_series = [time + 20 * i for i in range(10)]
df = pandas_dummy
df[header["col_timestamp"]] = time_series
return df
@pytest.fixture(scope="module")
def train_test_dummy_timestamp(pandas_dummy_timestamp):
return train_test_split(pandas_dummy_timestamp, test_size=0.2, random_state=0)
@pytest.fixture(scope="module")
def demo_usage_data(header, sar_settings):
# load the data
data = pd.read_csv(sar_settings["FILE_DIR"] + "demoUsage.csv")
data["rating"] = pd.Series([1] * data.shape[0])
data = data.rename(
columns={
"userId": header["col_user"],
"productId": header["col_item"],
"rating": header["col_rating"],
"timestamp": header["col_timestamp"],
}
)
# convert timestamp
data[header["col_timestamp"]] = data[header["col_timestamp"]].apply(
lambda s: float(
calendar.timegm(
datetime.datetime.strptime(s, "%Y/%m/%dT%H:%M:%S").timetuple()
)
)
)
return data
@pytest.fixture(scope="module")
def demo_usage_data_spark(spark, demo_usage_data, header):
data_local = demo_usage_data[[x[1] for x in header.items()]]
# TODO: install pyArrow in DS VM
# spark.conf.set("spark.sql.execution.arrow.enabled", "true")
data = spark.createDataFrame(data_local)
return data
@pytest.fixture(scope="module")
def sar_settings():
return {
# absolute tolerance parameter for matrix equivalence in SAR tests
"ATOL": 1e-8,
# directory of the current file - used to link unit test data
"FILE_DIR": "http://recodatasets.blob.core.windows.net/sarunittest/",
# user ID used in the test files (they are designed for this user ID, this is part of the test)
"TEST_USER_ID": "0003000098E85347",
}
@pytest.mark.parametrize(
"similarity_type, timedecay_formula", [("jaccard", False), ("lift", True)]
)
def test_fit(spark, similarity_type, timedecay_formula, train_test_dummy_timestamp, header):
model = SARPlus(spark, **header)
trainset, testset = train_test_dummy_timestamp
df = spark.createDataFrame(trainset)
df.write.mode("overwrite").saveAsTable("trainset")
df = spark.table("trainset")
model.fit(df,
timedecay_formula=timedecay_formula,
similarity_type=similarity_type)
"""
Main SAR tests are below - load test files which are used for both Scala SAR and Python reference implementations
"""
# Tests 1-6
@pytest.mark.parametrize(
"threshold,similarity_type,file",
[
(1, "cooccurrence", "count"),
(1, "jaccard", "jac"),
(1, "lift", "lift"),
(3, "cooccurrence", "count"),
(3, "jaccard", "jac"),
(3, "lift", "lift"),
],
)
def test_sar_item_similarity(
spark, threshold, similarity_type, file, demo_usage_data, sar_settings, header
):
model = SARPlus(spark, **header)
df = spark.createDataFrame(demo_usage_data)
model.fit(df,
timedecay_formula=False,
time_decay_coefficient=30,
time_now=None,
threshold=threshold,
similarity_type=similarity_type)
# reference
item_similarity_ref = pd.read_csv(sar_settings["FILE_DIR"] + "sim_" + file + str(threshold) + ".csv")
item_similarity_ref = pd.melt(item_similarity_ref,
item_similarity_ref.columns[0],
item_similarity_ref.columns[1:],
'i2',
'value')
item_similarity_ref.columns = ['i1', 'i2', 'value']
item_similarity_ref = item_similarity_ref[item_similarity_ref.value > 0]\
.sort_values(['i1', 'i2'])\
.reset_index(drop=True)\
# actual
item_similarity = model.item_similarity\
.toPandas()\
.sort_values(['i1', 'i2'])\
.reset_index(drop=True)
if similarity_type is "cooccurrence":
assert((item_similarity_ref == item_similarity).all().all())
else:
assert((item_similarity.iloc[:,:1] == item_similarity_ref.iloc[:,:1]).all().all())
assert np.allclose(
item_similarity.value.values,
item_similarity_ref.value.values
)
# Test 7
def test_user_affinity(spark, demo_usage_data, sar_settings, header):
time_now = demo_usage_data[header["col_timestamp"]].max()
model = SARPlus(spark, **header)
df = spark.createDataFrame(demo_usage_data)
model.fit(df,
timedecay_formula=True,
time_decay_coefficient=30,
time_now=time_now,
similarity_type="cooccurrence")
user_affinity_ref = pd.read_csv(sar_settings["FILE_DIR"] + "user_aff.csv")
user_affinity_ref = pd.melt(user_affinity_ref, user_affinity_ref.columns[0], user_affinity_ref.columns[1:], 'ItemId', 'Rating')
user_affinity_ref = user_affinity_ref[user_affinity_ref.Rating > 0]\
.reset_index(drop=True)
# construct dataframe with test user id we'd like to get the affinity for
df_test = spark.createDataFrame(pd.DataFrame({header['col_user']:[sar_settings["TEST_USER_ID"]]}))
user_affinity = model.get_user_affinity(df_test).toPandas().reset_index(drop=True)
# verify the that item ids are the same
assert (user_affinity[header['col_item']] == user_affinity_ref.ItemId).all()
assert np.allclose(
user_affinity_ref[header['col_rating']].values,
user_affinity['Rating'].values,
atol=sar_settings["ATOL"]
)
# Tests 8-10
@pytest.mark.parametrize(
"threshold,similarity_type,file",
[(3, "cooccurrence", "count"), (3, "jaccard", "jac"), (3, "lift", "lift")],
)
def test_userpred(
spark, threshold, similarity_type, file, header, sar_settings, demo_usage_data
):
time_now = demo_usage_data[header["col_timestamp"]].max()
test_id = '{0}_{1}_{2}'.format(threshold, similarity_type, file)
model = SARPlus(spark, **header, table_prefix=test_id)
df = spark.createDataFrame(demo_usage_data)
model.fit(df,
timedecay_formula=True,
time_decay_coefficient=30,
time_now=time_now,
threshold=threshold,
similarity_type=similarity_type)
url = (sar_settings["FILE_DIR"]
+ "userpred_"
+ file
+ str(threshold)
+ "_userid_only.csv")
pred_ref = pd.read_csv(url)
pred_ref = pd.wide_to_long(pred_ref, ['rec','score'], 'user', 'idx')\
.sort_values('score', ascending=False)\
.reset_index(drop=True)
# Note: it's important to have a separate cache_path for each run as they're interferring with each other
pred = model.recommend_k_items(
spark.createDataFrame(demo_usage_data[
demo_usage_data[header["col_user"]] == sar_settings["TEST_USER_ID"]
]),
cache_path='test_userpred-' + test_id,
top_k=10,
n_user_prediction_partitions=1)
pred = pred.toPandas()\
.sort_values('score', ascending=False)\
.reset_index(drop=True)
assert (pred.MovieId.values == pred_ref.rec.values).all()
assert np.allclose(pred.score.values, pred_ref.score.values, atol=sar_settings["ATOL"])