大数据项目之电商最近一小时广告点击量

发布时间:2021年12月02日作者:atguigu浏览次数:353

需求八:最近一小时广告点击量

统计各广告最近 1 小时内的点击量趋势:各广告最近 1 小时内各分钟的点击量

Redis存储数据结构如图所示:

大数据培训电商项目

1 思路分析

使用窗口函数进行最近一小时的数据统计,并将结果写入Redis。

2 代码实现

1)LastHourAdsHandler

object LastHourAdsHandler {

    def statLastHourAds(filteredDStream: DStream[AdsInfo]) = {

 

        val dateFormatter = new SimpleDateFormat(“HH:mm”)

 

        // 1. 利用窗口来对DStream进行开窗

        val DStreamWithWindow: DStream[AdsInfo] = filteredDStream.window(Minutes(2), Seconds(4))

 

        val hourMinutesCount = DStreamWithWindow.map(adsInfo => {

            ((adsInfo.adsId, dateFormatter.format(new Date(adsInfo.ts))), 1)

        }).reduceByKey(_ + _).map {

            case ((adsId, hourMinutes), count) => (adsId, (hourMinutes, count))

        }

 

        // 2. 转成json格式的字符串

        val adsIdHourMintesJson: DStream[(String, String)] = hourMinutesCount.groupByKey.map {

            case (adsId, hourMinutsCountIt) => {

                import org.json4s.JsonDSL._

                (adsId, JsonMethods.compact(JsonMethods.render(hourMinutsCountIt)))

            }

        }

 

        // 3. 写入redis

        adsIdHourMintesJson.foreachRDD(rdd => {

            val client: Jedis = RedisUtil.getJedisClient

            val result: Array[(String, String)] = rdd.collect

            result.foreach(println)

            import scala.collection.JavaConversions._

            client.hmset(“last:hour:ads:click”, result.toMap)

            client.close()

        })

    }

}

 

2)RealtimeApp

object RealtimeApp {

    def main(args: Array[String]): Unit = {

 

        // 从kafka中读出我们需要数据

        // 1. 创建 SparkConf 对象

        val conf: SparkConf = new SparkConf()

            .setAppName(“RealTimeApp”)

            .setMaster(“local[*]”)

        // 2. 创建 SparkContext 对象

        val sc = new SparkContext(conf)

        // 3. 创建 StreamingContext

        val ssc = new StreamingContext(sc, Seconds(2))

        // 4. 得到 DStream

        val recordDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getDStream(ssc, “ads_log”)

 

        // 5. 为了方便后面的计算, 把消费到的字符串封装到对象中

        val adsInfoDStream: DStream[AdsInfo] = recordDStream.map {

            record =>

                val split: Array[String] = record.value.split(“,”)

                AdsInfo(split(0).toLong, split(1), split(2), split(3), split(4))

        }

 

        // 6: 需求5:

        val filteredDStream: DStream[AdsInfo] = BlackListApp.filterBlackList(adsInfoDStream, sc)

        BlackListApp.checkUserToBlackList(filteredDStream)

 

        // 7. 需求6:

        val dayAreaAdsCityCount: DStream[(String, Int)] = DayAreaCityAdsApp.statAreaCityAdsPerDay(filteredDStream, sc)

 

        // 8. 需求 7

        AreaAdsTop3.statAreaAdsTop3(dayAreaAdsCityCount)

 

        // 9. 需求8

        LastHourAdsApp.statLastHourAds(filteredDStream)

        ssc.start()

        ssc.awaitTermination()

    }

}

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)