当用户u 对电影p 进行了评分,将触发一次对u 的推荐结果的更新。由于用户u 对电影p 评分,对于用户u 来说,他与p 最相似的电影们之间的推荐强度将发生变化,所以选取与电影p 最相似的K 个电影作为候选电影。
每个候选电影按照“推荐优先级”这一权重作为衡量这个电影被推荐给用户u 的优先级。
这些电影将根据用户u 最近的若干评分计算出各自对用户u 的推荐优先级,然后与上次对用户u 的实时推荐结果的进行基于推荐优先级的合并、替换得到更新后的推荐结果。
具体来说:
首先,获取用户u 按时间顺序最近的K 个评分,记为RK;获取电影p 的最相似的K 个电影集合,记为S;
然后,对于每个电影qS ,计算其推荐优先级,计算公式如下:
其中:
表示用户u 对电影r 的评分;
sim(q,r)表示电影q 与电影r 的相似度,设定最小相似度为0.6,当电影q和电影r 相似度低于0.6 的阈值,则视为两者不相关并忽略;
sim_sum 表示q 与RK 中电影相似度大于最小阈值的个数;
incount 表示RK 中与电影q 相似的、且本身评分较高(>=3)的电影个数;
recount 表示RK 中与电影q 相似的、且本身评分较低(<3)的电影个数;
公式的意义如下:
首先对于每个候选电影q,从u 最近的K 个评分中,找出与q 相似度较高(>=0.6)的u 已评分电影们,对于这些电影们中的每个电影r,将r 与q 的相似度乘以用户u 对r 的评分,将这些乘积计算平均数,作为用户u 对电影q 的评分预测即
然后,将u 最近的K 个评分中与电影q 相似的、且本身评分较高(>=3)的电影个数记为 incount,计算lgmax{incount,1}作为电影 q 的“增强因子”,意义在于电影q 与u 的最近K 个评分中的n 个高评分(>=3)电影相似,则电影q 的优先级被增加lgmax{incount,1}。如果电影 q 与 u 的最近 K 个评分中相似的高评分电影越多,也就是说n 越大,则电影q 更应该被推荐,所以推荐优先级被增强的幅度较大;如果电影q 与u 的最近K 个评分中相似的高评分电影越少,也就是n 越小,则推荐优先级被增强的幅度较小;
而后,将u 最近的K 个评分中与电影q 相似的、且本身评分较低(<3)的电影个数记为 recount,计算lgmax{recount,1}作为电影 q 的“削弱因子”,意义在于电影q 与u 的最近K 个评分中的n 个低评分(<3)电影相似,则电影q 的优先级被削减lgmax{incount,1}。如果电影 q 与 u 的最近 K 个评分中相似的低评分电影越多,也就是说n 越大,则电影q 更不应该被推荐,所以推荐优先级被减弱的幅度较大;如果电影q 与u 的最近K 个评分中相似的低评分电影越少,也就是n 越小,则推荐优先级被减弱的幅度较小;
最后,将增强因子增加到上述的预测评分中,并减去削弱因子,得到最终的q 电影对于u 的推荐优先级。在计算完每个候选电影q 的后,将生成一组<电影q 的ID, q 的推荐优先级>的列表updatedList:
而在本次为用户u 实时推荐之前的上一次实时推荐结果Rec 也是一组<电影m,m 的推荐优先级>的列表,其大小也为K:
接下来,将updated_S 与本次为u 实时推荐之前的上一次实时推荐结果Rec进行基于合并、替换形成新的推荐结果NewRec:
其中,i表示updated_S 与Rec 的电影集合中的每个电影,topK 是一个函数,表示从 Recupdated _ S中选择出最大的 K 个电影,cmp = 表示topK 函数将推荐优先级值最大的K 个电影选出来。最终,NewRec 即为经过用户u 对电影p 评分后触发的实时推荐得到的最新推荐结果。
总之,实时推荐算法流程流程基本如下:
(1)用户u 对电影p 进行了评分,触发了实时推荐的一次计算;
(2)选出电影p 最相似的K 个电影作为集合S;
(3)获取用户u 最近时间内的K 条评分,包含本次评分,作为集合RK;
(4)计算电影的推荐优先级,产生<qID,>集合updated_S;
将updated_S 与上次对用户u 的推荐结果Rec 利用公式(4-4)进行合并,产生新的推荐结果NewRec;作为最终输出。
我们在recommender下新建子项目StreamingRecommender,引入spark、scala、mongo、redis和kafka的依赖:
<dependencies>
<!– Spark的依赖引入 –>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
</dependency>
<!– 引入Scala –>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!– 加入MongoDB的驱动 –>
<!– 用于代码方式连接MongoDB –>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!– 用于Spark和MongoDB的对接 –>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<!– redis –>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!– kafka –>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
代码中首先定义样例类和一个连接助手对象(用于建立redis和mongo连接),并在StreamingRecommender中定义一些常量:
src/main/scala/com.atguigu.streaming/StreamingRecommender.scala
// 连接助手对象object ConnHelper extends Serializable{
lazy val jedis = new Jedis(“localhost”)
lazy val mongoClient = MongoClient(MongoClientURI(“mongodb://localhost:27017/recommender”))
}
case class MongConfig(uri:String,db:String)
// 标准推荐
case class Recommendation(mid:Int, score:Double)
// 用户的推荐
case class UserRecs(uid:Int, recs:Seq[Recommendation])
//电影的相似度
case class MovieRecs(mid:Int, recs:Seq[Recommendation]) object StreamingRecommender {
val MAX_USER_RATINGS_NUM = 20
val MAX_SIM_MOVIES_NUM = 20
val MONGODB_STREAM_RECS_COLLECTION = “StreamRecs”
val MONGODB_RATING_COLLECTION = “Rating”
val MONGODB_MOVIE_RECS_COLLECTION = “MovieRecs”//入口方法def main(args: Array[String]): Unit = {}}
实时推荐主体代码如下:
def main(args: Array[String]): Unit = {
val config = Map(
“spark.cores” -> “local[*]”,
“mongo.uri” -> “mongodb://localhost:27017/recommender”,
“mongo.db” -> “recommender”,
“kafka.topic” -> “recommender”
)
//创建一个SparkConf配置
val sparkConf = new SparkConf().setAppName(“StreamingRecommender”).setMaster(config(“spark.cores”))
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc,Seconds(2))
implicit val mongConfig = MongConfig(config(“mongo.uri”),config(“mongo.db”))
import spark.implicits._
// 广播电影相似度矩阵
//装换成为 Map[Int, Map[Int,Double]]
val simMoviesMatrix = spark
.read
.option(“uri”,config(“mongo.uri”))
.option(“collection”,MONGODB_MOVIE_RECS_COLLECTION)
.format(“com.mongodb.spark.sql”)
.load()
.as[MovieRecs]
.rdd
.map{recs =>
(recs.mid,recs.recs.map(x=> (x.mid,x.score)).toMap)
}.collectAsMap()
val simMoviesMatrixBroadCast = sc.broadcast(simMoviesMatrix)
//创建到Kafka的连接
val kafkaPara = Map(
“bootstrap.servers” -> “localhost:9092”,
“key.deserializer” -> classOf[StringDeserializer],
“value.deserializer” -> classOf[StringDeserializer],
“group.id” -> “recommender”,
“auto.offset.reset” -> “latest”
)
val kafkaStream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(config(“kafka.topic”)),kafkaPara))
// UID|MID|SCORE|TIMESTAMP
// 产生评分流
val ratingStream = kafkaStream.map{case msg=>
var attr = msg.value().split(“\\|”)
(attr(0).toInt,attr(1).toInt,attr(2).toDouble,attr(3).toInt)
}
// 核心实时推荐算法
ratingStream.foreachRDD{rdd =>
rdd.map{case (uid,mid,score,timestamp) =>
println(“>>>>>>>>>>>>>>>>”)
//获取当前最近的M次电影评分
val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM,uid,ConnHelper.jedis)
//获取电影P最相似的K个电影
val simMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM,mid,uid,simMoviesMatrixBroadCast.value)
//计算待选电影的推荐优先级
val streamRecs = computeMovieScores(simMoviesMatrixBroadCast.value,userRecentlyRatings,simMovies)
//将数据保存到MongoDB
saveRecsToMongoDB(uid,streamRecs)
}.count()
}
//启动Streaming程序
ssc.start()
ssc.awaitTermination()
}
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。
上一篇: 大数据培训项目实时推荐服务
下一篇: 大数据培训项目实时推荐算法的实现