大数据培训项目实时推荐算法的实现

实时推荐算法的前提:

  1. 在Redis集群中存储了每一个用户最近对电影的K次评分。实时算法可以快速获取。
  2. 离线推荐算法已经将电影相似度矩阵提前计算到了MongoDB中。
  3. Kafka已经获取到了用户实时的评分数据。

算法过程如下:

实时推荐算法输入为一个评分<userId, movieId, rate, timestamp>,而执行的核心内容包括:获取userId 最近K 次评分、获取movieId 相似K 个电影、计算候选电影的推荐优先级、更新对userId 的实时推荐结果。

1 获取用户的K次最近评分

业务服务器在接收用户评分的时候,默认会将该评分情况以userId, movieId, rate, timestamp的格式插入到Redis中该用户对应的队列当中,在实时算法中,只需要通过Redis客户端获取相对应的队列内容即可。

import scala.collection.JavaConversions._/**
  * 获取当前最近的M次电影评分
  * @param num  评分的个数
  * @param uid  谁的评分
  * @return
  */
def getUserRecentlyRating(num:Int, uid:Int,jedis:Jedis): Array[(Int,Double)] ={
  //从用户的队列中取出num个评分
  jedis.lrange(“uid:”+uid.toString, 0, num).map{item =>
    val attr = item.split(“\\:”)
    (attr(0).trim.toInt, attr(1).trim.toDouble)
  }.toArray
}

2 获取当前电影相似的K个电影

在离线算法中,已经预先将电影的相似度矩阵进行了计算,所以每个电影movieId 的相似的K 个电影很容易获取:从MongoDB中读取MovieRecs数据,从movieId 在simHash 对应的子哈希表中获取相似度前K 大的那些电影。输出是数据类型为Array[Int]的数组,表示与movieId 相似的电影集合,并命名为candidateMovies 以作为候选电影集合。

/**
  * 获取当前电影K个相似的电影
  * @param num          相似电影的数量
  * @param mid          当前电影的ID
  * @param uid          当前的评分用户
  * @param simMovies    电影相似度矩阵的广播变量值
  * @param mongConfig   MongoDB的配置
  * @return
  */
def getTopSimMovies(num:Int, mid:Int, uid:Int, simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]])(implicit mongConfig: MongConfig): Array[Int] ={
  //从广播变量的电影相似度矩阵中获取当前电影所有的相似电影
  val allSimMovies = simMovies.get(mid).get.toArray
  //获取用户已经观看过得电影
  val ratingExist = ConnHelper.mongoClient(mongConfig.db)(MONGODB_RATING_COLLECTION).find(MongoDBObject(“uid” -> uid)).toArray.map{item =>
    item.get(“mid”).toString.toInt
  }
  //过滤掉已经评分过得电影,并排序输出
  allSimMovies.filter(x => !ratingExist.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
}

3 电影推荐优先级计算

对于候选电影集合simiHash和userId 的最近K 个评分recentRatings,算法代码内容如下:

/**
  * 计算待选电影的推荐分数
  * @param simMovies            电影相似度矩阵
  * @param userRecentlyRatings  用户最近的k次评分
  * @param topSimMovies         当前电影最相似的K个电影
  * @return
  */
def computeMovieScores(          simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Doub          le]],userRecentlyRatings:Array[(Int,Double)],topSimMovies: Array[Int]):           Array[(Int,Double)] ={

  //用于保存每一个待选电影和最近评分的每一个电影的权重得分
  val score = scala.collection.mutable.ArrayBuffer[(Int,Double)]()

  //用于保存每一个电影的增强因子数
  val increMap = scala.collection.mutable.HashMap[Int,Int]()

  //用于保存每一个电影的减弱因子数
  val decreMap = scala.collection.mutable.HashMap[Int,Int]()

  for (topSimMovie <- topSimMovies; userRecentlyRating <- userRecentlyRatings){
    val simScore = getMoviesSimScore(simMovies,userRecentlyRating._1,topSimMovie)
    if(simScore > 0.6){
      score += ((topSimMovie, simScore * userRecentlyRating._2 ))
      if(userRecentlyRating._2 > 3){
        increMap(topSimMovie) = increMap.getOrDefault(topSimMovie,0) + 1
      }else{
        decreMap(topSimMovie) = decreMap.getOrDefault(topSimMovie,0) + 1
      }
    }
  }

  score.groupBy(_._1).map{case (mid,sims) =>
    (mid,sims.map(_._2).sum / sims.length + log(increMap.getOrDefault(mid, 1)) – log(decreMap.getOrDefault(mid, 1)))
  }.toArray.sortWith(_._2>_._2)

}

其中,getMovieSimScore是取候选电影和已评分电影的相似度,代码如下:

/**
  * 获取当个电影之间的相似度
  * @param simMovies       电影相似度矩阵
  * @param userRatingMovie 用户已经评分的电影
  * @param topSimMovie     候选电影
  * @return
  */
def getMoviesSimScore(simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]], userRatingMovie:Int, topSimMovie:Int): Double ={
  simMovies.get(topSimMovie) match {
    case Some(sim) => sim.get(userRatingMovie) match {
      case Some(score) => score
      case None => 0.0
    }
    case None => 0.0
  }
}

而log是对数运算,这里实现为取10的对数(常用对数):

//取10的对数
def log(m:Int):Double ={
  math.log(m) / math.log(10)
}

4 将结果保存到mongoDB

saveRecsToMongoDB函数实现了结果的保存:

/**
  * 将数据保存到MongoDB    uid -> 1,  recs -> 22:4.5|45:3.8
  * @param streamRecs  流式的推荐结果
  * @param mongConfig  MongoDB的配置
  */
def saveRecsToMongoDB(uid:Int,streamRecs:Array[(Int,Double)])(implicit mongConfig: MongConfig): Unit ={
  //到StreamRecs的连接
  val streaRecsCollection = ConnHelper.mongoClient(mongConfig.db)(MONGODB_STREAM_RECS_COLLECTION)

  streaRecsCollection.findAndRemove(MongoDBObject(“uid” -> uid))
  streaRecsCollection.insert(MongoDBObject(“uid” -> uid, “recs” ->          streamRecs.map( x => MongoDBObject(“mid”->x._1,“score”->x._2)) ))
}

5 更新实时推荐结果

当计算出候选电影的推荐优先级的数组updatedRecommends<movieId, E>后,这个数组将被发送到Web 后台服务器,与后台服务器上userId 的上次实时推荐结果recentRecommends<movieId, E>进行合并、替换并选出优先级E 前K大的电影作为本次新的实时推荐。具体而言:

a.合并:将updatedRecommends 与recentRecommends 并集合成为一个新的<movieId, E>数组;

b.替换(去重):当updatedRecommends 与recentRecommends 有重复的电影movieId 时,recentRecommends 中movieId 的推荐优先级由于是上次实时推荐的结果,于是将作废,被替换成代表了更新后的updatedRecommends的movieId 的推荐优先级;

c.选取TopK:在合并、替换后的<movieId, E>数组上,根据每个movie 的推荐优先级,选择出前K 大的电影,作为本次实时推荐的最终结果。

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。