大数据培训项目实时推荐算法设计

发布时间:2021年12月21日作者:atguigu浏览次数:778

当用户u 对电影p 进行了评分,将触发一次对u 的推荐结果的更新。由于用户u 对电影p 评分,对于用户u 来说,他与p 最相似的电影们之间的推荐强度将发生变化,所以选取与电影p 最相似的K 个电影作为候选电影。

每个候选电影按照“推荐优先级”这一权重作为衡量这个电影被推荐给用户u 的优先级。

这些电影将根据用户u 最近的若干评分计算出各自对用户u 的推荐优先级,然后与上次对用户u 的实时推荐结果的进行基于推荐优先级的合并、替换得到更新后的推荐结果。

具体来说:

首先,获取用户u 按时间顺序最近的K 个评分,记为RK;获取电影p 的最相似的K 个电影集合,记为S;

然后,对于每个电影qS ,计算其推荐优先级,计算公式如下:

大数据培训项目

    其中:

表示用户u 对电影r 的评分;

sim(q,r)表示电影q 与电影r 的相似度,设定最小相似度为0.6,当电影q和电影r 相似度低于0.6 的阈值,则视为两者不相关并忽略;

sim_sum 表示q 与RK 中电影相似度大于最小阈值的个数;

incount 表示RK 中与电影q 相似的、且本身评分较高(>=3)的电影个数;

recount 表示RK 中与电影q 相似的、且本身评分较低(<3)的电影个数;

公式的意义如下:

首先对于每个候选电影q,从u 最近的K 个评分中,找出与q 相似度较高(>=0.6)的u 已评分电影们,对于这些电影们中的每个电影r,将r 与q 的相似度乘以用户u 对r 的评分,将这些乘积计算平均数,作为用户u 对电影q 的评分预测即

大数据培训项目

然后,将u 最近的K 个评分中与电影q 相似的、且本身评分较高(>=3)的电影个数记为 incount,计算lgmax{incount,1}作为电影 q 的“增强因子”,意义在于电影q 与u 的最近K 个评分中的n 个高评分(>=3)电影相似,则电影q 的优先级被增加lgmax{incount,1}。如果电影 q 与 u 的最近 K 个评分中相似的高评分电影越多,也就是说n 越大,则电影q 更应该被推荐,所以推荐优先级被增强的幅度较大;如果电影q 与u 的最近K 个评分中相似的高评分电影越少,也就是n 越小,则推荐优先级被增强的幅度较小;

而后,将u 最近的K 个评分中与电影q 相似的、且本身评分较低(<3)的电影个数记为 recount,计算lgmax{recount,1}作为电影 q 的“削弱因子”,意义在于电影q 与u 的最近K 个评分中的n 个低评分(<3)电影相似,则电影q 的优先级被削减lgmax{incount,1}。如果电影 q 与 u 的最近 K 个评分中相似的低评分电影越多,也就是说n 越大,则电影q 更不应该被推荐,所以推荐优先级被减弱的幅度较大;如果电影q 与u 的最近K 个评分中相似的低评分电影越少,也就是n 越小,则推荐优先级被减弱的幅度较小;

最后,将增强因子增加到上述的预测评分中,并减去削弱因子,得到最终的q 电影对于u 的推荐优先级。在计算完每个候选电影q 的后,将生成一组<电影q 的ID, q 的推荐优先级>的列表updatedList:

大数据培训项目

而在本次为用户u 实时推荐之前的上一次实时推荐结果Rec 也是一组<电影m,m 的推荐优先级>的列表,其大小也为K:

大数据培训项目

接下来,将updated_S 与本次为u 实时推荐之前的上一次实时推荐结果Rec进行基于合并、替换形成新的推荐结果NewRec:

大数据培训项目

其中,i表示updated_S 与Rec 的电影集合中的每个电影,topK 是一个函数,表示从 Recupdated _ S中选择出最大的 K 个电影,cmp =  表示topK 函数将推荐优先级值最大的K 个电影选出来。最终,NewRec 即为经过用户u 对电影p 评分后触发的实时推荐得到的最新推荐结果。

总之,实时推荐算法流程流程基本如下:

(1)用户u 对电影p 进行了评分,触发了实时推荐的一次计算;

(2)选出电影p 最相似的K 个电影作为集合S;

(3)获取用户u 最近时间内的K 条评分,包含本次评分,作为集合RK;

(4)计算电影的推荐优先级,产生<qID,>集合updated_S;

将updated_S 与上次对用户u 的推荐结果Rec 利用公式(4-4)进行合并,产生新的推荐结果NewRec;作为最终输出。

我们在recommender下新建子项目StreamingRecommender,引入spark、scala、mongo、redis和kafka的依赖:

<dependencies>
    <!– Spark的依赖引入 –>
   
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
    </dependency>
    <!– 引入Scala –>
   
<dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>

    <!– 加入MongoDB的驱动 –>
    <!–
用于代码方式连接MongoDB –>
   
<dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah-core_2.11</artifactId>
        <version>${casbah.version}</version>
    </dependency>
    <!– 用于SparkMongoDB的对接 –>
   
<dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.11</artifactId>
        <version>${mongodb-spark.version}</version>
    </dependency>

    <!– redis –>
   
<dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>

    <!– kafka –>
   
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

</dependencies>

代码中首先定义样例类和一个连接助手对象(用于建立redis和mongo连接),并在StreamingRecommender中定义一些常量:

src/main/scala/com.atguigu.streaming/StreamingRecommender.scala

// 连接助手对象object ConnHelper extends Serializable{
  lazy val jedis = new Jedis(“localhost”)
  lazy val mongoClient = MongoClient(MongoClientURI(“mongodb://localhost:27017/recommender”))
}

case class MongConfig(uri:String,db:String)

// 标准推荐
case class Recommendation(mid:Int, score:Double)

// 用户的推荐
case class UserRecs(uid:Int, recs:Seq[Recommendation])

//电影的相似度
case class MovieRecs(mid:Int, recs:Seq[Recommendation]) object StreamingRecommender {

  val MAX_USER_RATINGS_NUM = 20
  val MAX_SIM_MOVIES_NUM = 20
  val MONGODB_STREAM_RECS_COLLECTION = “StreamRecs”
 
val MONGODB_RATING_COLLECTION = “Rating”
 
val MONGODB_MOVIE_RECS_COLLECTION = “MovieRecs”//入口方法def main(args: Array[String]): Unit = {}}

实时推荐主体代码如下:

def main(args: Array[String]): Unit = {

  val config = Map(
    “spark.cores” -> “local[*]”,
    “mongo.uri” -> “mongodb://localhost:27017/recommender”,
    “mongo.db” -> “recommender”,
    “kafka.topic” -> “recommender”
 
)
  //创建一个SparkConf配置
 
val sparkConf = new SparkConf().setAppName(“StreamingRecommender”).setMaster(config(“spark.cores”))
 
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  val sc = spark.sparkContext
  val ssc = new StreamingContext(sc,Seconds(2))

  implicit val mongConfig = MongConfig(config(“mongo.uri”),config(“mongo.db”))
  import spark.implicits._

  // 广播电影相似度矩阵
 
//装换成为 Map[Int, Map[Int,Double]]
 
val simMoviesMatrix = spark
    .read
    .option(“uri”,config(“mongo.uri”))
    .option(“collection”,MONGODB_MOVIE_RECS_COLLECTION)
    .format(“com.mongodb.spark.sql”)
    .load()
    .as[MovieRecs]  
   
.rdd
   
.map{recs =>
      (recs.mid,recs.recs.map(x=> (x.mid,x.score)).toMap)
    }.collectAsMap() 
 
val simMoviesMatrixBroadCast = sc.broadcast(simMoviesMatrix)

  //
创建到Kafka的连接
 
val kafkaPara = Map(
    “bootstrap.servers” -> “localhost:9092”,
    “key.deserializer” -> classOf[StringDeserializer],
    “value.deserializer” -> classOf[StringDeserializer],
    “group.id” -> “recommender”,
    “auto.offset.reset” -> “latest”
 
)

  val kafkaStream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(config(“kafka.topic”)),kafkaPara))

  // UID|MID|SCORE|TIMESTAMP
  //
产生评分流
 
val ratingStream = kafkaStream.map{case msg=>
    var attr = msg.value().split(\\|”)
    (attr(0).toInt,attr(1).toInt,attr(2).toDouble,attr(3).toInt)
  }

// 核心实时推荐算法
  ratingStream.foreachRDD{rdd =>
    rdd.map{case (uid,mid,score,timestamp) =>
      println(“>>>>>>>>>>>>>>>>”)

      //获取当前最近的M次电影评分
     
val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM,uid,ConnHelper.jedis)

      //获取电影P最相似的K个电影
     
val simMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM,mid,uid,simMoviesMatrixBroadCast.value)

      //计算待选电影的推荐优先级
     
val streamRecs = computeMovieScores(simMoviesMatrixBroadCast.value,userRecentlyRatings,simMovies)

      //将数据保存到MongoDB
     
saveRecsToMongoDB(uid,streamRecs)

    }.count()
  }

  //启动Streaming程序
 
ssc.start()
  ssc.awaitTermination()
}

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)