8.4.6 利用MLlib进行电影推荐

下面介绍MLib进行个性化的电影推荐应用。通过Berkely的这个典型案例,用户可以更加深入地理解MLlib以及学会如何构建自己的MLlib应用。[1]本例中使用MovieLenss收集的72000名用户在1万部影片上的1千万个评分数据集。这里假定这个数据集已经预加载进集群的HDFS文件夹/movielens/large下。为了快速测试代码,可以先使用文件夹/movielens/medium中的小数据集进行测试,这个数据集包含6000名用户在4000部影片上的1百万个评分数据。

1.数据集

本例使用MovieLens数据集中的两个文件:“ratings.dat.”和“movies.dat”。所有的评分数据按照下面的格式存储“ratings.dat”中。


  1. UserID::MovieID::Rating::Timestamp

在“movies.dat”中以下面的格式存储电影信息。


  1. MovieID::Title::Genres

2.协同过滤

协同过滤是推荐系统普遍使用的方法。这些技术的本质目的是填充user-item关联矩阵中的缺失数据项。MLlib 1.0支持基于模型的协同过滤,这时通过一个隐含因子的小集合来预测缺失的数据项。MLlib通过实现交替最小二乘法(ALS)去求出这些隐含因子(latent factors)。图8-25为ALS算法的解析图。ALS算法为了最小化损失函数f,通过交替固定Users或者Movies向量,对损失函数求导,最终求出逼近Ratings矩阵的结果,使用这个结果进行电影推荐,如图8-27所示。

8.4.6 利用MLlib进行电影推荐 - 图1

图8-27 ALS算法

3.配置

针对本例使用一个standalone项目模板。假设在用户的环境中,已经配置好所需路径和文件(实例下载地址为https://github.com/amplab/training/tree/ampcamp4/machine-learning/scala),这些已经在/root/machine-learning/scala/中设置,读者将会在目录下找到以下选项。

·sbt:包含SBT工具的目录。

·build.sbt:SBT项目文件。

·MovieLensALS.scala:用户需要编译和运行的主要Scala主程序。

·solution:包含solution代码的目录。

用户需要编辑、编译和运行的主要文件是MovieLensALS.scala,可以将下面的代码模板拷贝到文件中。


  1. import java.util.Random
  2. import org.apache.log4j.Logger
  3. import org.apache.log4j.Level
  4. import scala.io.Source
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.SparkContext
  7. import org.apache.spark.SparkContext._
  8. import org.apache.spark.rdd._
  9. import org.apache.spark.mllib.recommendation.{ALS Rating MatrixFactorizationModel}
  10. object MovieLensALS {
  11. def mainargs Array[String]) {
  12. Logger.getLogger"org.apache.spark").setLevelLevel.WARN
  13. Logger.getLogger"org.eclipse.jetty.server").setLevelLevel.OFF
  14. if args.length != 1 {
  15. println"Usage: sbt/sbt package \"run movieLensHomeDir\""
  16. exit1
  17. }
  18. /* 配置环境 */
  19. val jarFile = "target/scala-2.10/movielens-als_2.10-0.0.jar"
  20. val sparkHome = "/root/spark"
  21. val master = Source.fromFile"/root/spark-ec2/cluster-url").mkString.trim
  22. val masterHostname = Source.fromFile"/root/spark-ec2/masters").mkString.trim
  23. val conf = new SparkConf()
  24. .setMastermaster
  25. .setSparkHomesparkHome
  26. .setAppName"MovieLensALS"
  27. .set"spark.executor.memory" "8g"
  28. .setJarsSeqjarFile))
  29. val sc = new SparkContextconf
  30. /* 加载评分和电影标题 */
  31. val movieLensHomeDir = "hdfs://" + masterHostname + ":9000" + args0
  32. val ratings = sc.textFilemovieLensHomeDir + "/ratings.dat").map { line =>
  33. val fields = line.split"::"
  34. /* 格式为: (timestamp % 10, Rating(userId, movieId, rating)) */
  35. fields3).toLong % 10 Ratingfields0).toInt fields1).toInt fields2).toDouble))
  36. }
  37. val movies = sc.textFilemovieLensHomeDir + "/movies.dat").map { line =>
  38. val fields = line.split"::"
  39. /* 格式为: (movieId, movieName) */
  40. fields0).toInt fields1))
  41. }.collect.toMap
  42. sc.stop();
  43. }
  44. /** 计算RMSE (Root Mean Squared Error) */
  45. def computeRmsemodel MatrixFactorizationModel data RDD[Rating], n Long = {
  46. ……
  47. }
  48. def elicitateRatingsmovies Seq[(Int String)]) = {
  49. ……
  50. }
  51. }

首先通过文本编辑器打开MovieLensALS文件。


  1. cd /root/machine-learning/scala

vim MovieLensALS.scala#如果不使用vim,还可以使用emacs或者nano进行文本编辑。

可以选用常用的文本编辑器打开文件,将示例拷贝进文件中。

对于任何的Spark计算任务来说,第一步都需要创建SparkConf对象。然后通过它创建SparkContext对象。对于Scala或者Java程序来说,需要配置Spark集群的URL、Spark主目录和用户程序需要的JAR文件进行初始化。对于Python程序来说,只需要配置Spark cluster URL一个参数即可。最后还需要配置一个应用名称,以便在Spark Web UI中确认程序。

①可以参照下面的初始化代码。


  1. val conf = new SparkConf()
  2. .setMastermaster
  3. .setSparkHomesparkHome
  4. .setAppName"MovieLensALS"
  5. .set"spark.executor.memory" "8g"
  6. .setJarsSeqjarFile))
  7. val sc = new SparkContextconf

②使用SparkContext对象读取评分文件。评分文件以“::”作为分隔符。下面的代码解析评分文件的每行创建以(Int,Rating)对为数据项的一个RDD。这里保存时间戳的最后一个数字作为一个随机关键字。Ratingclass是一个对元组(user:Int,product:Int,rating:Double)的包装类,它在MLlib的包org.apache.spark.mllib.recommendation中定义。


  1. val movieLensHomeDir = "hdfs://" + masterHostname + ":9000" + args0
  2. val ratings = sc.textFilemovieLensHomeDir + "/ratings.dat").map { line =>
  3. val fields = line.split"::"
  4. /* 格式为: (timestamp % 10, Rating(userId, movieId, rating)) */
  5. fields3).toLong % 10 Ratingfields0).toInt fields1).toInt fields2).
  6. toDouble))
  7. }

③通过读取movie id和title将其转化电影ID到title的映射。


  1. val movies = sc.textFilemovieLensHomeDir + "/movies.dat").map { line =>
  2. val fields = line.split"::"
  3. /* 格式为: (movieId, movieName) */
  4. fields0).toInt fields1))
  5. }.collect.toMap

至此,用户可以统计一些评分数据。


  1. val numRatings = ratings.count
  2. val numUsers = ratings.map_._2.user).distinct.count
  3. val numMovies = ratings.map_._2.product).distinct.count
  4. println"Got " + numRatings + " ratings from "
  5. + numUsers + " users on " + numMovies + " movies."

4.运行程序

用户应该知道如何运行例子。保存之前编辑的MovieLensALS文件,然后运行下面的命令。


  1. cd /root/machine-learning/scala

如果用户需要运行大数据集,则将参数medium转换为large,进而在大数据集上运行例子:


  1. sbt/sbt package "run /movielens/medium" / *其中/movielens/medium是主程序main函数的输入参数 */

这个命令将会编译MovieLensALS类,然后在/root/machine-learning/scala/target/scala-2.10/目录下创建一个JAR文件,最后运行这个程序,在用户的控制台上显示下面的输出。


  1. Got 1000209 ratings from 6040 users on 3706 movies.

5.启发评级

为了向用户推荐,需要通过用户评价的一些电影了解用户的兴趣。需要统计每个电影接收到的评分,然后根据评分数排序。最后获取评分最高的50部电影,采样出一个小集合进行启发评级。


  1. val mostRatedMovieIds = ratings.map_._2.product
  2. /* 抽取 movie id*/
  3. .countByValue
  4. /* 计算每个movie的评分*/
  5. .toSeq
  6. /* 将数据转换为Seq格式*/
  7. .sortBy(- _._2
  8. /* 通过评分数排序*/
  9. .take50
  10. /* 获得评分数最多的50部movie*/
  11. .map_._1
  12. /* 获取它们的ID*/
  13. val random = new Random0
  14. val selectedMovies = mostRatedMovieIds.filterx => random.nextDouble() < 0.2
  15. .mapx => x moviesx)))
  16. .toSeq

每个被挑选到的电影都需要评分(评分为0~5的整数,如果没有看过,则填0)。方法eclicitateRatings返回用户的评分,用户会分配到一个特殊的用户ID 0。这个评分通过sc.parallelize.转换为一个RDD[Rating]实例。


  1. val myRatings = elicitateRatingsselectedMovies
  2. val myRatingsRDD = sc.parallelizemyRatings

运行以上程序,将会看到和下面类似的提示。


  1. Please rate the following movie 1-5 best), or 0 if not seen):
  2. Raiders of the Lost Ark 1981):

6.切分训练数据

使用MLlib中的ALS算法将会用RDD[Rating]实例作为输入来训练一个模型。ALS算法有一些训练参数。例如,矩阵因子的排名和正则化器的实例。为了确定一个好的训练参数,基于时间戳的最后一位将数据分成3个没有交集的子集,称为训练集、测试集和评价集,并将它们缓存。接下来会通过训练集训练不同的模型,通过RMSE(root mean squared error)方法和评价集选取最好的集合,通过测试集评价最好的模型,并将用户的评分加入测试集中,进而对用户推荐。在这个过程中,由于需要多次访问这些数据,所以会把训练集、评价集和测试集通过persist方法放到内存。


  1. val numPartitions = 20
  2. val training = ratings.filterx => x._1 < 6
  3. .values
  4. .unionmyRatingsRDD
  5. .repartitionnumPartitions
  6. .persist
  7. val validation = ratings.filterx => x._1 >= 6 && x._1 < 8
  8. .values
  9. .repartitionnumPartitions
  10. .persist
  11. val test = ratings.filterx => x._1 >= 8).values.persist
  12. val numTraining = training.count
  13. val numValidation = validation.count
  14. val numTest = test.count
  15. println"Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest

在进行切分之后,用户将会看到下面的日志信息。


  1. Training 602251 validation 198919 test 199049.

7.通过ALS算法进行模型训练

下面使用ALS.train方法来训练一组模型,然后从中评价和选择出最好的模型。ALS所有训练算法中最重要的参数是rank、lambda(正则化常数)和迭代次数iterations。使用的ALS算法中的train方法以下面的方式给出。


  1. object ALS {
  2. def trainratings RDD[Rating], rank Int iterations Int lambda Double
  3. MatrixFactorizationModel = {
  4. ……
  5. }
  6. }

在理想情况下,用户希望能够尝试所有的参数组合来发现最好的状况。但是由于时间的约束,本例只会测试8种组合:两种不同的rank(8和12)、两种不同的lambdas(1.0和10.0)以及两种不同的iterations(10和20)。MLlib中提供方法computeRmse在每个模型的评价集上计算RMSE。RMSE值最小的评价集将被最后选择,RMSE值作为评价指标。


  1. val ranks = List8 12
  2. val lambdas = List0.1 10.0
  3. val numIters = List10 20
  4. var bestModel Option[MatrixFactorizationModel] = None
  5. var bestValidationRmse = Double.MaxValue
  6. var bestRank = 0
  7. var bestLambda = -1.0
  8. var bestNumIter = -1
  9. for rank <- ranks lambda <- lambdas numIter <- numIters {
  10. val model = ALS.traintraining rank numIter lambda
  11. val validationRmse = computeRmsemodel validation numValidation
  12. println"RMSE (validation) = " + validationRmse + " for the model trained
  13. with rank = "
  14. + rank + ", lambda = " + lambda + ", and numIter = " + numIter + "."
  15. if validationRmse < bestValidationRmse {
  16. bestModel = Somemodel
  17. bestValidationRmse = validationRmse
  18. bestRank = rank
  19. bestLambda = lambda
  20. bestNumIter = numIter
  21. }
  22. }
  23. val testRmse = computeRmsebestModel.get test numTest
  24. println"The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
  25. + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is "
  26. + testRmse + "."

程序运行成功后,用户将在控制台看到下面的信息。


  1. The best model was trained using rank 8 and lambda 10.0 and its RMSE on test is 0.8808492431998702.

8.电影推荐

最后可以看到通过训练出的模型推荐给用户哪些电影。推荐是通过生成用户没有评分的电影的(0,movieId)对,然后调用predict方法获取预测。


  1. class MatrixFactorizationModel {
  2. def predictuserProducts RDD[(Int Int)]): RDD[Rating] = {
  3. ……
  4. }
  5. }

获取所有的预测之后,用户可以列出top 50的推荐电影。


  1. val myRatedMovieIds = myRatings.map_.product).toSet
  2. val candidates = sc.parallelizemovies.keys.filter(!myRatedMovieIds.contains_)).toSeq
  3. val recommendations = bestModel.get
  4. .predictcandidates.map((0 _)))
  5. .collect
  6. .sortBy(-_.rating
  7. .take50
  8. var i = 1
  9. println"Movies recommended for you:"
  10. recommendations.foreach { r =>
  11. println"%2d".formati + ": " + moviesr.product))
  12. i += 1
  13. }

用户会得到类似下面的输出。


  1. Movies recommended for you
  2. 1 Silence of the Lambs The 1991
  3. 2 Saving Private Ryan 1998
  4. 3 Godfather The 1972
  5. 4 Star Wars Episode IV - A New Hope 1977
  6. 5 Braveheart 1995
  7. 6 Schindler's List (1993)
  8. 7: Shawshank Redemption, The (1994)
  9. 8: Star Wars: Episode V - The Empire Strikes Back (1980)
  10. 9: Pulp Fiction (1994)
  11. 10: Alien (1979)
  12. ……

由于数据集较旧,显示的基本都是老电影。

[1] 示例参考:http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html