EvnetTimeWindow API
1 滚动窗口(TumblingEventTimeWindows)
def main(args: Array[String]): Unit = { // 环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dstream: DataStream[String] = env.socketTextStream(“hadoop1”,7777) val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text => val arr: Array[String] = text.split(” “) (arr(0), arr(1).toLong, 1) } val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) { override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }) val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0) textKeyStream.print(“textkey:”) val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.seconds(2))) val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) => set += ts } groupDstream.print(“window::::”).setParallelism(1) env.execute() } } |
结果是按照Event Time的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。
上一篇: 大数据培训课程之EventTime
下一篇: 前端培训面试题分析-浏览器端存储有哪些,他们的区别是什么