大数据培训项目实时推荐算法的实现

发布时间:2021年12月21日作者:atguigu浏览次数:236

实时推荐算法的前提:

  1. 在Redis集群中存储了每一个用户最近对电影的K次评分。实时算法可以快速获取。
  2. 离线推荐算法已经将电影相似度矩阵提前计算到了MongoDB中。
  3. Kafka已经获取到了用户实时的评分数据。

算法过程如下:

实时推荐算法输入为一个评分<userId, movieId, rate, timestamp>,而执行的核心内容包括:获取userId 最近K 次评分、获取movieId 最相似K 个电影、计算候选电影的推荐优先级、更新对userId 的实时推荐结果。

1 获取用户的K次最近评分

业务服务器在接收用户评分的时候,默认会将该评分情况以userId, movieId, rate, timestamp的格式插入到Redis中该用户对应的队列当中,在实时算法中,只需要通过Redis客户端获取相对应的队列内容即可。

import scala.collection.JavaConversions._/**
  *
获取当前最近的M次电影评分
 
* @param num  评分的个数
 
* @param uid  谁的评分
 
* @return
 
*/
def getUserRecentlyRating(num:Int, uid:Int,jedis:Jedis): Array[(Int,Double)] ={
  //从用户的队列中取出num个评分
 
jedis.lrange(“uid:”+uid.toString, 0, num).map{item =>
    val attr = item.split(\\:”)
    (attr(0).trim.toInt, attr(1).trim.toDouble)
  }.toArray
}

2 获取当前电影最相似的K个电影

在离线算法中,已经预先将电影的相似度矩阵进行了计算,所以每个电影movieId 的最相似的K 个电影很容易获取:从MongoDB中读取MovieRecs数据,从movieId 在simHash 对应的子哈希表中获取相似度前K 大的那些电影。输出是数据类型为Array[Int]的数组,表示与movieId 最相似的电影集合,并命名为candidateMovies 以作为候选电影集合。

/**
  *
获取当前电影K个相似的电影
 
* @param num          相似电影的数量
 
* @param mid          当前电影的ID
  *
@param uid          当前的评分用户
 
* @param simMovies    电影相似度矩阵的广播变量值
 
* @param mongConfig   MongoDB的配置
 
* @return
 
*/
def getTopSimMovies(num:Int, mid:Int, uid:Int, simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]])(implicit mongConfig: MongConfig): Array[Int] ={
  //从广播变量的电影相似度矩阵中获取当前电影所有的相似电影
 
val allSimMovies = simMovies.get(mid).get.toArray
  //获取用户已经观看过得电影
 
val ratingExist = ConnHelper.mongoClient(mongConfig.db)(MONGODB_RATING_COLLECTION).find(MongoDBObject(“uid” -> uid)).toArray.map{item =>
    item.get(“mid”).toString.toInt
  }
  //过滤掉已经评分过得电影,并排序输出
 
allSimMovies.filter(x => !ratingExist.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
}

3 电影推荐优先级计算

对于候选电影集合simiHash和userId 的最近K 个评分recentRatings,算法代码内容如下:

/**
  *
计算待选电影的推荐分数
 
* @param simMovies            电影相似度矩阵
 
* @param userRecentlyRatings  用户最近的k次评分
 
* @param topSimMovies         当前电影最相似的K个电影
 
* @return
 
*/
def computeMovieScores(          simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Doub          le]],userRecentlyRatings:Array[(Int,Double)],topSimMovies: Array[Int]):           Array[(Int,Double)] ={

  //用于保存每一个待选电影和最近评分的每一个电影的权重得分
 
val score = scala.collection.mutable.ArrayBuffer[(Int,Double)]()

  //用于保存每一个电影的增强因子数
 
val increMap = scala.collection.mutable.HashMap[Int,Int]()

  //用于保存每一个电影的减弱因子数
 
val decreMap = scala.collection.mutable.HashMap[Int,Int]()

  for (topSimMovie <- topSimMovies; userRecentlyRating <- userRecentlyRatings){
    val simScore = getMoviesSimScore(simMovies,userRecentlyRating._1,topSimMovie)
    if(simScore > 0.6){
      score += ((topSimMovie, simScore * userRecentlyRating._2 ))
      if(userRecentlyRating._2 > 3){
        increMap(topSimMovie) = increMap.getOrDefault(topSimMovie,0) + 1
      }else{
        decreMap(topSimMovie) = decreMap.getOrDefault(topSimMovie,0) + 1
      }
    }
  }

  score.groupBy(_._1).map{case (mid,sims) =>
    (mid,sims.map(_._2).sum / sims.length + log(increMap.getOrDefault(mid, 1)) – log(decreMap.getOrDefault(mid, 1)))
  }.toArray.sortWith(_._2>_._2)

}

其中,getMovieSimScore是取候选电影和已评分电影的相似度,代码如下:

/**
  *
获取当个电影之间的相似度
 
* @param simMovies       电影相似度矩阵
 
* @param userRatingMovie 用户已经评分的电影
 
* @param topSimMovie     候选电影
 
* @return
 
*/
def getMoviesSimScore(simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]], userRatingMovie:Int, topSimMovie:Int): Double ={
  simMovies.get(topSimMovie) match {
    case Some(sim) => sim.get(userRatingMovie) match {
      case Some(score) => score
      case None => 0.0
    }
    case None => 0.0
  }
}

而log是对数运算,这里实现为取10的对数(常用对数):

//10的对数
def log(m:Int):Double ={
  math.log(m) / math.log(10)
}

4 将结果保存到mongoDB

saveRecsToMongoDB函数实现了结果的保存:

/**
  *
将数据保存到MongoDB    uid -> 1,  recs -> 22:4.5|45:3.8
  *
@param streamRecs  流式的推荐结果
 
* @param mongConfig  MongoDB的配置
 
*/
def saveRecsToMongoDB(uid:Int,streamRecs:Array[(Int,Double)])(implicit mongConfig: MongConfig): Unit ={
  //StreamRecs的连接
 
val streaRecsCollection = ConnHelper.mongoClient(mongConfig.db)(MONGODB_STREAM_RECS_COLLECTION)

  streaRecsCollection.findAndRemove(MongoDBObject(“uid” -> uid))
  streaRecsCollection.insert(MongoDBObject(“uid” -> uid, “recs” ->          streamRecs.map( x => MongoDBObject(“mid”->x._1,“score”->x._2)) ))
}

5 更新实时推荐结果

当计算出候选电影的推荐优先级的数组updatedRecommends<movieId, E>后,这个数组将被发送到Web 后台服务器,与后台服务器上userId 的上次实时推荐结果recentRecommends<movieId, E>进行合并、替换并选出优先级E 前K大的电影作为本次新的实时推荐。具体而言:

a.合并:将updatedRecommends 与recentRecommends 并集合成为一个新的<movieId, E>数组;

b.替换(去重):当updatedRecommends 与recentRecommends 有重复的电影movieId 时,recentRecommends 中movieId 的推荐优先级由于是上次实时推荐的结果,于是将作废,被替换成代表了更新后的updatedRecommends的movieId 的推荐优先级;

c.选取TopK:在合并、替换后的<movieId, E>数组上,根据每个movie 的推荐优先级,选择出前K 大的电影,作为本次实时推荐的最终结果。

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训 UI/UE设计培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦6层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)