Collaborative Filtering vs. Content-based Filtering
Content-Based Filtering
Based on features of items
Genre: Comedy, Action, Drama
Animation: Animated, Not-animated
Language: English, Spanish, Korean
Decade Produced: 1950s. 1980s
Actors: Meryl Streep, Tom Hanks
Collaborative Filtering
Based on similar user preferences
implicit vs explicit ratings
Data Preparation
Get Integer IDs
Extract unique userIds and movieIds
Assign unique integers to each id
Rejoin unique integer ids back to the ratings data
from pyspark.sql.functions import array, col, explode, lit, structdefto_long(df,by= ["userId"]): # "by" is the column by which you want the final output dataframe to be grouped by cols = [c for c in df.columns if c notin by] kvs =explode(array([struct(lit(c).alias("movieId"), col(c).alias("rating")) for c in cols])).alias("kvs") long_df = df.select(by + [kvs]).select(by + ["kvs.movieId", "kvs.rating"]).filter("rating IS NOT NULL")# Excluding null ratings values since ALS in Pyspark doesn't want blank/null valuesreturn long_df#1users = long_ratings.select('userId').distinct()user.show()#2# Tell Spark to convert the columns to the proper data typesratings = ratings.select(ratings.userId.cast("integer"), ratings.movieId.cast("integer"), ratings.rating.cast("double"))from pyspark.sql.functions import monotonically_increasing_idusers = users.coalesce(1)#repartition the dataframe into one partitionusers = users.withColumn("userIntId", monotonically_increasing_id()).persist()#persist ensure the new integer IDs persist.users.show()movies = long_ratings.select("variable").distinct()movies = movies.coalesce(1)movies = movies.withColumn("movieId", monotonically_increasing_id()).persist()movies.show()#3ratings_w_int_ids = long_ratings.join(users, "userId", "left").join(movies, "variable", "left")ratings_w_int_ids.show()# or alternatively 3from pyspark.ml.functions import colratings_data = ratings_w_int_ids.select(col("userIntId").alias("userid"),col("variable").alias("movieId"),col("rating"))ratings_data.show()
ALS Parameters and Hyperparameters
als_model =ALS(userCol="userId", itemCol="movieId", ratingCol="rating",rank=25, maxIter=100, regParam=.05, alpha=40,nonnegative=True,coldStartStrategy="drop",implicitPrefs=False)# Fit ALS to training datasetmodel = als.fit(training_data)# Generate predictions on test datasetpredictions = model.transform(test_data)
Arguments
userCol: Name of column that contains user ids
itemCol: Name of column that contains item ids
ratingCol: Name of column that contains ratings
Hyperparameters
rank, k: number of latent features
maxIter: number of iterations
regParam: Lambda; regularization parameter, term added to error matrix to avoid overfitting the training data
alpha: Only used with implicit ratings. How much (in integer) should add to the model's confidence that a user actually likes the movie/song.
nonnegative = True: Ensures positive numbers
coldStartStrategy = "drop": Addresses issues with test/train split; only use users that have ratings in both training and testing set, and not to use users that only appear in the testing set to calculate RMSE.
# Split the ratings dataframe into training and test data(training_data, test_data) = ratings.randomSplit([0.8, 0.2], seed=1234)# Set the ALS hyperparametersfrom pyspark.ml.recommendation import ALSals =ALS(userCol="userId", itemCol="movieId", ratingCol="rating", rank =10, maxIter =15, regParam =.1, coldStartStrategy="drop", nonnegative =True, implicitPrefs =False)# Fit the mdoel to the training_datamodel = als.fit(training_data)# Generate predictions on the test_datatest_predictions = model.transform(test_data)test_predictions.show()
Build RMSE Evaluator
An RMSE of 0.633 means that on average the model predicts 0.633 above or below values of the original ratings matrix.
# Import RegressionEvaluatorfrom pyspark.ml.evaluation import RegressionEvaluatorevaluator =RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")# Evaluate the "test_predictions" dataframeRMSE = evaluator.evaluate(test_predictions)# Print the RMSEprint (RMSE)
Sparsity=(Number of Users)×(Number of Movies)Number of Ratings in Matrix
# Number of ratings in matrixnumerator = ratings.count()# Distinct users and moviesusers = ratings.select("userId").distinct().count()movies = ratings.select("movieId").distinct().count()# Number of ratings matrix could contain if no empty cellsdenominator = users * movies#Calculating sparsitysparsity =1- (numerator*1.0/ denominator)print ("Sparsity: "), sparsity
Explore with aggregation function
for data exploration
# Min num ratings for moviesprint("Movie with the fewest ratings: ")ratings.groupBy("movieId").count().select(min("count")).show()# Avg num ratings per movieprint("Avg num ratings per movie: ")ratings.groupBy("movieId").count().select(avg("count")).show()# Min num ratings for userprint("User with the fewest ratings: ")ratings.groupBy("userId").count().select(min("count")).show()# Avg num ratings per usersprint("Avg num ratings per user: ")ratings.groupBy("userId").count().select(avg("count")).show()
ALS model buildout on MovieLens Data
ParamGridBuilder
# Imports ParamGridBuilder packagefrom pyspark.ml.tuning import ParamGridBuilder# Creates a ParamGridBuilder, and adds hyperparameters and valuesparam_grid =ParamGridBuilder().addGrid(als.rank, [5, 40, 80, 120]).addGrid(als.maxIter, [5, 100, 250, 500]).addGrid(als.regParam, [.05, .1, 1.5]).build()
CrossValidator
# Imports CrossValidator packagefrom pyspark.ml.tuning import CrossValidator# Creates cross validator and tells Spark what to use when training # and evaluacv =CrossValidator(estimator = als,estimatorParamMaps = param_grid,evaluator = evaluator,numFolds =5)
RandomSplit
# Create training and test set (80/20 split)(training, test) = movie_ratings.randomSplit([0.8, 0.2])# Build generic ALS model without hyperparametersals =ALS(userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop", nonnegative =True,implicitPrefs =False)
In-order
# Tell Spark what values to try for each hyperparameterfrom pyspark.ml.tuning import ParamGridBuilderparam_grid =ParamGridBuilder().addGrid(als.rank, [5, 40, 80, 120]).addGrid(als.maxIter, [5, 100, 250, 500]).addGrid(als.regParam, [.05, .1, 1.5]).build()# Tell Spark how to evaluate model performanceevaluator =RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")# Build cross validation step using CrossValidatorfrom pyspark.ml.tuning import CrossValidatorcv =CrossValidator(estimator = als,estimatorParamMaps = param_grid,evaluator = evaluator,numFolds =5)# Run the cv on the training datamodel = cv.fit(training)# Extract best combination of values from cross validationbest_model = model.bestModel# Generate test set predictions and evaluate using RMSEpredictions = best_model.transform(test)rmse = evaluator.evaluate(predictions)# Print evaluation metrics and model parametersprint ("**Best Model**")print ("RMSE = "), rmseprint (" Rank: "), best_model.rankprint (" MaxIter: "), best_model._java_obj.parent().getMaxIter()print (" RegParam: "), best_model._java_obj.parent().getRegParam()
Model Performance Evaluation and Cleanup
# Generate n recommendations for all usersrecommendForAllUsers(n)# n is an integer ALS_recommendations.show()# 这里出来的recommendations是一个metadata#Cleaning Up Recommendation Output ALS_recommendations.registerTempTable("ALS_recs_temp") exploded_recs = spark.sql("SELECT uderId,explode(recommendations) AS MovieRec FROM ALS_recs_temp") exploded_recs.show() clean_recs = spark.sql("SELECT userId, movieIds_and_ratings.movieId AS movieId, movieIds_and_ratings.rating AS prediction FROM ALS_recs_temp LATERAL VIEW explode(recommendations) exploded_table AS movieIds_and_ratings") clean_recs.show()#Filtering Recommendations clean_recs.join(movie_info, ["movieId"], "left").show() clean_recs.join(movie_ratings, ["userId", "movieId"], "left").filter(movie_ratings.rating.isNull()).show()
# Min num implicit ratings for a songprint("Minimum implicit ratings for a song: ")msd.filter(col("num_plays") >0).groupBy("songId").count().select(min("count")).show()# Avg num implicit ratings per songsprint("Average implicit ratings per song: ")msd.filter(col("num_plays") >0).groupBy("songId").count().select(avg("count")).show()# Min num implicit ratings from a userprint("Minimum implicit ratings from a user: ")msd.filter(col("num_plays") >0).groupBy("userId").count().select(min("count")).show()# Avg num implicit ratings from usersprint("Average implicit ratings per user: ")msd.filter(col("num_plays") >0).groupBy("userId").count().select(avg("count")).show()
Rank Ordering Error Metrics (ROEM)
ROEM=∑u,iru,it∑u,iru,itranku,i
现在就不能再用RMSE了,因为在implicit data下,我们没有true value,只有: the number of time that a certain song is played and confidence level (how much confident our model is that they like that song). 这个时候,就判断test set里,我们判断的值和if the prediction make sense, whether they played it more than once. ROEM的意义就是whether songs with higher number of plays have higher predictions.
ranks = [10,20,30,40]maxIters = [10,20,30,40]regParams = [.05,.1,.15]alphas = [20,40,60,80]create and store ALS modelsfor r in ranks:for mi in maxIters:for rp in regParams:for a in alphas: model_list.append(ALS(userCol="userId", itemCol="songId", ratingCol="num_plays", rank = r, maxIter = mi, regParam = rp, alpha = a, coldStartStrategy="drop", nonnegative =True, implicitPrefs =True))# Print the model list, and the length of model_listprint (model_list, "Length of model_list: ", len(model_list))# Validatelen(model_list)== (len(ranks)*len(maxIters)*len(regParams)*len(alphas))# Split the data into training and test sets(training, test) = msd.randomSplit([0.8, 0.2])#Building 5 folds within the training set.train1, train2, train3, train4, train5 = training.randomSplit([0.2, 0.2, 0.2, 0.2, 0.2], seed =1)fold1 = train2.union(train3).union(train4).union(train5)fold2 = train3.union(train4).union(train5).union(train1)fold3 = train4.union(train5).union(train1).union(train2)fold4 = train5.union(train1).union(train2).union(train3)fold5 = train1.union(train2).union(train3).union(train4)foldlist = [(fold1, train1), (fold2, train2), (fold3, train3), (fold4, train4), (fold5, train5)]# Empty list to fill with ROEMs from each modelROEMS = []# Loops through all models and all foldsfor model in model_list:for ft_pair in foldlist:# Fits model to fold within training data fitted_model = model.fit(ft_pair[0])# Generates predictions using fitted_model on respective CV test data predictions = fitted_model.transform(ft_pair[1])# Generates and prints a ROEM metric CV test data r =ROEM(predictions)print ("ROEM: ", r)# Fits model to all of training data and generates preds for test data v_fitted_model = model.fit(training) v_predictions = v_fitted_model.transform(test) v_ROEM =ROEM(v_predictions)# Adds validation ROEM to ROEM list ROEMS.append(v_ROEM)print ("Validation ROEM: ", v_ROEM)# Import numpyimport numpy# Find the index of the smallest ROEMi = numpy.argmin(ROEMS)print("Index of smallest ROEM:", i)# Find ith element of ROEMSprint("Smallest ROEM: ", ROEMS[i])# Extract the best_modelbest_model = model_list[38]# Extract the Rankprint ("Rank: ", best_model.getRank())# Extract the MaxIter valueprint ("MaxIter: ", best_model.getMaxIter())# Extract the RegParam valueprint ("RegParam: ", best_model.getRegParam())# Extract the Alpha valueprint ("Alpha: ", best_model.getAlpha())
ALS_expected_percent_rank_cv
defROEM_cv(ratings_df,userCol="userId",itemCol="songId",ratingCol="num_plays",ranks= [10,50,100,150,200],maxIters= [10,25,50,100,200,400],regParams= [.05,.1,.15],alphas= [10,40,80,100]):#Originally run on a subset of the Echo Next Taste Profile dataset found here:#https://labrosa.ee.columbia.edu/millionsong/tasteprofilefrom pyspark.sql.functions import randfrom pyspark.ml.recommendation import ALS ratings_df = ratings_df.orderBy(rand())#Shuffling to ensure randomness#Building train and validation test sets train, validate = ratings_df.randomSplit([0.8, 0.2], seed =0)#Building 5 folds within the training set. test1, test2, test3, test4, test5 = train.randomSplit([0.2, 0.2, 0.2, 0.2, 0.2], seed =1) train1 = test2.union(test3).union(test4).union(test5) train2 = test3.union(test4).union(test5).union(test1) train3 = test4.union(test5).union(test1).union(test2) train4 = test5.union(test1).union(test2).union(test3) train5 = test1.union(test2).union(test3).union(test4)#Creating variables that will be replaced by the best model's hyperparameters for subsequent printing best_validation_performance =9999999999999 best_rank =0 best_maxIter =0 best_regParam =0 best_alpha =0 best_model =0 best_predictions =0#Looping through each combindation of hyperparameters to ensure all combinations are tested.for r in ranks:for mi in maxIters:for rp in regParams:for a in alphas:#Create ALS model als =ALS(rank = r, maxIter = mi, regParam = rp, alpha = a, userCol=userCol, itemCol=itemCol, ratingCol=ratingCol,coldStartStrategy="drop", nonnegative =True, implicitPrefs =True)#Fit model to each fold in the training set model1 = als.fit(train1) model2 = als.fit(train2) model3 = als.fit(train3) model4 = als.fit(train4) model5 = als.fit(train5)#Generating model's predictions for each fold in the test set predictions1 = model1.transform(test1) predictions2 = model2.transform(test2) predictions3 = model3.transform(test3) predictions4 = model4.transform(test4) predictions5 = model5.transform(test5)#Expected percentile rank error metric functiondefROEM(predictions,userCol="userId",itemCol="songId",ratingCol="num_plays"):#Creates table that can be queried predictions.createOrReplaceTempView("predictions")#Sum of total number of plays of all songs denominator = predictions.groupBy().sum(ratingCol).collect()[0][0]#Calculating rankings of songs predictions by user spark.sql("SELECT "+ userCol +" , "+ ratingCol +" , PERCENT_RANK() OVER (PARTITION BY "+ userCol +" ORDER BY prediction DESC) AS rank FROM predictions").createOrReplaceTempView("rankings")#Multiplies the rank of each song by the number of plays and adds the products together numerator = spark.sql('SELECT SUM('+ ratingCol +' * rank) FROM rankings').collect()[0][0] performance = numerator/denominatorreturn performance#Calculating expected percentile rank error metric for the model on each fold's prediction set performance1 =ROEM(predictions1) performance2 =ROEM(predictions2) performance3 =ROEM(predictions3) performance4 =ROEM(predictions4) performance5 =ROEM(predictions5)#Printing the model's performance on each foldprint ("Model Parameters: ")("Rank:"), r, (" MaxIter:"), mi, ("RegParam:"), rp, ("Alpha: "), aprint("Test Percent Rank Errors: "), performance1, performance2, performance3, performance4, performance5#Validating the model's performance on the validation set validation_model = als.fit(train) validation_predictions = validation_model.transform(validate) validation_performance =ROEM(validation_predictions)#Printing model's final expected percentile ranking error metricprint("Validation Percent Rank Error: "), validation_performanceprint(" ")#Filling in final hyperparameters with those of the best-performing modelif validation_performance < best_validation_performance: best_validation_performance = validation_performance best_rank = r best_maxIter = mi best_regParam = rp best_alpha = a best_model = validation_model best_predictions = validation_predictions#Printing best model's expected percentile rank and hyperparametersprint ("**Best Model** ")print (" Percent Rank Error: "), best_validation_performanceprint (" Rank: "), best_rankprint (" MaxIter: "), best_maxIterprint (" RegParam: "), best_regParamprint (" Alpha: "), best_alphareturn best_model, best_predictions
Binary Implicit Ratings
implicit rating
如果是Binary Ratings,比如只是预测1和0,那可以在weighting上做更多工作。
Item Weighting: Movies with more user views = higher weight
User Weighting: Users that have seen more movies will have lower weights applied to unseen movies