Flink进阶之滑动窗口统计实时热门商品

发布时间:2021年09月13日作者:atguigu浏览次数:1,449

思路分析

我们实现一个“实时热门商品”的需求,可以将“实时热门商品”翻译成程序员更好理解的需求:每隔5分钟输出最近一小时内点击量最多的前N个商品。将这个需求进行分解我们大概要做这么几件事情:

  • 抽取出事件时间戳,基于事件时间做窗口;
  • 过滤出点击行为数据;
  • 按一小时的窗口大小,每5分钟统计一次,做滑动窗口聚合(Sliding Window);
  • 按每个窗口聚合,输出每个窗口点击量TopN的商品。

代码主体

在src/main/java/beans下定义POJOs:UserBehavior和ItemViewCount。创建类HotItems,在main方法中创建
StreamExecutionEnvironment并做配置,然后从UserBehavior.csv文件中读取数据,并包装成UserBehavior类型。代码如下:

UserBehavior POJO类:

 
public class UserBehavior {
    // 定义私有属性
    private Long userId;
    private Long itemId;
    private Integer categoryId;
    private String behavior;
    private Long timestamp;


    public UserBehavior() {
    }


    public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.categoryId = categoryId;
        this.behavior = behavior;
        this.timestamp = timestamp;
    }


    public Long getUserId() {
        return userId;
    }


    public void setUserId(Long userId) {
        this.userId = userId;
    }


    public Long getItemId() {
        return itemId;
    }


    public void setItemId(Long itemId) {
        this.itemId = itemId;
    }


    public Integer getCategoryId() {
        return categoryId;
    }


    public void setCategoryId(Integer categoryId) {
        this.categoryId = categoryId;
    }


    public String getBehavior() {
        return behavior;
    }


    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }


    public Long getTimestamp() {
        return timestamp;
    }


    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }


    @Override
    public String toString() {
        return "UserBehavior{" +
                "userId=" + userId +
                ", itemId=" + itemId +
                ", categoryId=" + categoryId +
                ", behavior='" + behavior + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

ItemViewCount POJO类:

 
public class ItemViewCount {
    private Long itemId;
    private Long windowEnd;
    private Long count;


    public ItemViewCount() {
    }


    public ItemViewCount(Long itemId, Long windowEnd, Long count) {
        this.itemId = itemId;
        this.windowEnd = windowEnd;
        this.count = count;
    }


    public Long getItemId() {
        return itemId;
    }


    public void setItemId(Long itemId) {
        this.itemId = itemId;
    }


    public Long getWindowEnd() {
        return windowEnd;
    }


    public void setWindowEnd(Long windowEnd) {
        this.windowEnd = windowEnd;
    }


    public Long getCount() {
        return count;
    }


    public void setCount(Long count) {
        this.count = count;
    }


    @Override
    public String toString() {
        return "ItemViewCount{" +
                "itemId=" + itemId +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }
}
读取数据并进行类型转换:
       // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        // 2. 读取数据,创建DataStream
        DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv");


        // 3. 转换为POJO,分配时间戳和watermark
        DataStream<UserBehavior> dataStream = inputStream
                .map(line -> {
                    String[] fields = line.split(",");
                    return new UserBehavior(new Long(fields[0]), new Long(fields[1]), new Integer(fields[2]), fields[3], new Long(fields[4]));
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                    @Override
                    public long extractAscendingTimestamp(UserBehavior element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

这里注意,我们需要统计业务时间上的每小时的点击量,所以要基于EventTime来处理。那么如果让Flink按照我们想要的业务时间来处理呢?这里主要有两件事情要做。第一件是告诉Flink我们现在按照EventTime模式进行处理,Flink默认使用ProcessingTime处理,所以我们要显式设置如下:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

第二件事情是指定如何获得业务时间,以及生成Watermark。Watermark是用来追踪业务事件的概念,可以理解成EventTime世界中的时钟,用来指示当前处理到什么时刻的数据了。由于我们的数据源的数据已经经过整理,没有乱序,即事件的时间戳是单调递增的,所以可以将每条数据的业务时间就当做Watermark。这里我们用 assignAscendingTimestamps来实现时间戳的抽取和Watermark的生成。

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {                    @Override
                    public long extractAscendingTimestamp(UserBehavior element) {
                        return element.getTimestamp() * 1000L;
                    }
                });

这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。

过滤出点击事件

在开始窗口操作之前,先回顾下需求“每隔5分钟输出过去一小时内点击量最多的前N个商品”。由于原始数据中存在点击、购买、收藏、喜欢各种行为的数据,但是我们只需要统计点击量,所以先使用filter将点击行为数据过滤出来。

.filter(data -> "pv".equals(data.getBehavior()))

设置滑动窗口,统计点击量

由于要每隔5分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计[09:00, 10:00), [09:05,10:05), [09:10, 10:10)…等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。

.keyBy("itemId")    // 按商品ID分组
.timeWindow(Time.hours(1), Time.minutes(5))    // 开滑窗
.aggregate(new ItemCountAgg(), new WindowItemCountResult());

我们使用.keyBy(“itemId”)对商品进行分组,使用.timeWindow(Timesize, Time slide)对每个商品做滑动窗口(1小时窗口,5分钟滑动一次)。然后我们使用 .aggregate(AggregateFunction af, WindowFunction wf)做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少state的存储压力。较之 .apply(WindowFunctionwf)会将窗口中的数据都存储下来,最后一起计算要高效地多。这里的CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一。

 
public static class ItemCountAgg implements AggregateFunction<UserBehavior, Long, Long> {
    @Override
    public Long createAccumulator() {
        return 0L;
    }


    @Override
    public Long add(UserBehavior value, Long accumulator) {
        return accumulator + 1;
    }


    @Override
    public Long getResult(Long accumulator) {
        return accumulator;
    }


    @Override
    public Long merge(Long a, Long b) {
        return a + b;
    }
}

聚合操作.aggregate(AggregateFunctionaf, WindowFunction wf)的第二个参数WindowFunction将每个key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的WindowResultFunction将<主键商品ID,窗口,点击量>封装成了ItemViewCount进行输出。

代码如下:

 
public static class WindowItemCountResult implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
        Long itemId = tuple.getField(0);
        Long windowEnd = window.getEnd();
        Long count = input.iterator().next();
        out.collect(new ItemViewCount(itemId, windowEnd, count));
    }
}

现在我们就得到了每个商品在每个窗口的点击量的数据流。

计算最热门TopN商品

为了统计每个窗口下最热门的商品,我们需要再次按窗口进行分组,这里根据ItemViewCount中的windowEnd进行keyBy()操作。然后使用ProcessFunction实现一个自定义的TopN函数TopNHotItems来计算点击量排名前3名的商品,并将排名结果格式化成字符串,便于后续输出。

.keyBy("windowEnd")    // 按照窗口分组
.process(new TopNHotItems(5));   // 用自定义处理函数排序取前5

ProcessFunction是Flink提供的一个low-level API,用于实现更高级的功能。它主要提供了定时器timer的功能(支持EventTime或ProcessingTime)。本案例中我们将利用timer来判断何时收齐了某个window下所有商品的点击量数据。由于Watermark的进度是全局的,在processElement方法中,每当收到一条数据ItemViewCount,我们就注册一个windowEnd+1的定时器(Flink框架会自动忽略同一时间的重复注册)。windowEnd+1的定时器被触发时,意味着收到了windowEnd+1的Watermark,即收齐了该windowEnd下的所有商品窗口统计值。我们在onTimer()中处理将收集的所有商品及点击量进行排序,选出TopN,并将排名信息格式化成字符串后进行输出。

这里我们还使用了ListState<ItemViewCount>来存储收到的每条ItemViewCount消息,保证在发生故障时,状态数据的不丢失和一致性。ListState是Flink提供的类似Java List接口的State API,它集成了框架的checkpoint机制,自动做到了exactly-once的语义保证。

 
public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String>{
    // 定义属性,top n的大小
    private Integer topSize;


    public TopNHotItems(Integer topSize) {
        this.topSize = topSize;
    }


    // 定义列表状态,保存当前窗口内所有输出的ItemViewCount
    ListState<ItemViewCount> itemViewCountListState;


    @Override
    public void open(Configuration parameters) throws Exception {
        itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("item-view-count-list", ItemViewCount.class));
    }


    @Override
    public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
        // 每来一条数据,存入List中,并注册定时器
        itemViewCountListState.add(value);
        ctx.timerService().registerEventTimeTimer( value.getWindowEnd() + 1 );
    }


    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        // 定时器触发,当前已收集到所有数据,排序输出
        ArrayList<ItemViewCount> itemViewCounts = Lists.newArrayList(itemViewCountListState.get().iterator());


        itemViewCounts.sort(new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return o2.getCount().intValue() - o1.getCount().intValue();
            }
        });


        // 将排名信息格式化成String,方便打印输出
        StringBuilder resultBuilder = new StringBuilder();
        resultBuilder.append("===================================\n");
        resultBuilder.append("窗口结束时间:").append( new Timestamp(timestamp - 1)).append("\n");


        // 遍历列表,取top n输出
        for( int i = 0; i < Math.min(topSize, itemViewCounts.size()); i++ ){
            ItemViewCount currentItemViewCount = itemViewCounts.get(i);
            resultBuilder.append("NO ").append(i+1).append(":")
                    .append(" 商品ID = ").append(currentItemViewCount.getItemId())
                    .append(" 热门度 = ").append(currentItemViewCount.getCount())
                    .append("\n");
        }
        resultBuilder.append("===============================\n\n");


        // 控制输出频率
        Thread.sleep(1000L);


        out.collect(resultBuilder.toString());
    }
}

最后我们可以在main函数中将结果打印输出到控制台,方便实时观测:

.print();

至此整个程序代码全部完成,我们直接运行main函数,就可以在控制台看到不断输出的各个时间点统计出的热门商品。

更换Kafka作为数据源

实际生产环境中,我们的数据流往往是从Kafka获取到的。如果要让代码更贴近生产实际,我们只需将source更换为Kafka即可:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer<String>("
想要了解跟多关于

大数据培训

课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。

上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)