需求七:每天各地区热门广告点击量
每天各地区 top3 热门广告:Redis数据格式如图所示:
1 思路分析
基于统计好的每天实时统计的广告点击量来进行统计现在的指标
2 代码实现
1)AreaAdsTop3
object AreaAdsTop3 {
def statAreaAdsTop3(dayAreaAdsCityCount: DStream[(String, Int)]) ={(1)
// 1. 去掉城市 并且统计了地区的广告点击量
val dayAreaAdsCountDStream= dayAreaAdsCityCount.map {
case (dayAreaAdsCity, count) => {
// 2019-03-11:华南:深圳:1, 2
val split: Array[String] = dayAreaAdsCity.split(“:”)
(s”${split(0)}:${split(1)}:${split(3)}”, count)
}
}.reduceByKey(_ + _).map{
// RDD[(day,(area,(adsId, count2)))]
case (dayAreaAds, count) => {
val split: Array[String] = dayAreaAds.split(“:”)
(split(0), (split(1), (split(2), count)))
}
}
// 2. 按照key进行分组 RDD[key, Iterator[(area,(adsId, count2))]]
val groupByDayDStream: DStream[(String, Iterable[(String, (String, Int))])] = dayAreaAdsCountDStream.groupByKey
// (day, Map[area, “广告1: 200, 广告2: 200”])
val resultDStream: DStream[(String, Map[String, String])] = groupByDayDStream.map {
case (day, it: Iterable[(String, (String, Int))]) => {
// Map[area, Iterable[(area, (ads, count)]]
val temp1: Map[String, Iterable[(String, (String, Int))]] = it.groupBy(_._1)
// 去掉冗余的area
val temp2: Map[String, Iterable[(String, Int)]] = temp1.map {
case (day, it) => {
(day, it.map(_._2))
}
}
// 每个地区的广告按照点击量取前 3
val temp3 = temp2.map {
case (day, it) => {
val list: List[(String, Int)] = it.toList.sortWith(_._2 > _._2).take(3)
import org.json4s.JsonDSL._
val adsCountJsonString: String = JsonMethods.compact(JsonMethods.render(list))
(day, adsCountJsonString)
}
}
(day, temp3)
}
}
// 3.写入的redis
resultDStream.foreachRDD(rdd => {
val dayAreaAdsCountArray: Array[(String, Map[String, String])] = rdd.collect
val client: Jedis = RedisUtil.getJedisClient
dayAreaAdsCountArray.foreach{
case (day, map) => {
// 用来把scala的map转成java的map
import scala.collection.JavaConversions._
client.hmset(“area:ads:top3:” + day, map)
}
}
client.close()
})
}
}
2)RealtimeApp
object RealtimeApp {
def main(args: Array[String]): Unit = {
// 从kafka中读出我们需要数据
// 1. 创建 SparkConf 对象
val conf: SparkConf = new SparkConf()
.setAppName(“RealTimeApp”)
.setMaster(“local[*]”)
// 2. 创建 SparkContext 对象
val sc = new SparkContext(conf)
// 3. 创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(2))
// 4. 得到 DStream
val recordDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getDStream(ssc, “ads_log”)
// 5. 为了方便后面的计算, 把消费到的字符串封装到对象中
val adsInfoDStream: DStream[AdsInfo] = recordDStream.map {
record =>
val split: Array[String] = record.value.split(“,”)
AdsInfo(split(0).toLong, split(1), split(2), split(3), split(4))
}
// 6: 需求5:
val filteredDStream: DStream[AdsInfo] = BlackListApp.filterBlackList(adsInfoDStream, sc)
BlackListApp.checkUserToBlackList(filteredDStream)
// 7. 需求6:
val dayAreaAdsCityCount: DStream[(String, Int)] = DayAreaCityAdsApp.statAreaCityAdsPerDay(filteredDStream, sc)
// 8. 需求 7
AreaAdsTop3.statAreaAdsTop3(dayAreaAdsCityCount)
ssc.start()
ssc.awaitTermination()
}
}
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。
上一篇: 大数据项目之电商广告点击量实时统计
下一篇: 大数据项目之电商最近一小时广告点击量