大数据培训之Table API 与SQL

发布时间:2020年06月08日作者:atguigu浏览次数:728

Table API是流处理和批处理通用的关系型API,Table API可以基于流输入或者批输入来运行而不需要进行任何修改。Table API是SQL语言的超集并专门为Apache Flink设计的,Table API是Scala 和Java语言集成式的API。与常规SQL语言中将查询指定为字符串不同,Table API查询是以Java或Scala中的语言嵌入样式来定义的,具有IDE支持如:自动完成和语法检测。

1 需要引入的pom依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.7.0</version>
</dependency>

   

2 构造表环境

def main(args: Array[String]): Unit = {
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

 
val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer(“GMALL_STARTUP”)
  val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

  val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

  val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) }

  val startupLogTable: Table = tableEnv.fromDataStream(startupLogDstream)

   val table: Table = startupLogTable.select(“mid,ch”).filter(“ch =’appstore'”)

  val midchDataStream: DataStream[(String, String)] = table.toAppendStream[(String,String)]

  midchDataStream.print()
  env.execute()
}  

动态表

如果流中的数据类型是case class可以直接根据case class的结构生成table

tableEnv.fromDataStream(startupLogDstream) 

或者根据字段顺序单独命名

tableEnv.fromDataStream(startupLogDstream,’mid,’uid  …….) 

最后的动态表可以转换为流进行输出

table.toAppendStream[(String,String)]

字段

 用一个单引放到字段前面 来标识字段名, 如 ‘name , ‘mid ,’amount 等

3 通过一个例子 了解TableAPI

//每10秒中渠道为appstore的个数
def main(args: Array[String]): Unit = {
  //sparkcontext
 
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

 
//时间特性改为eventTime
 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer(“GMALL_STARTUP”)
  val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

  val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) }
  //告知watermark 和 eventTime如何提取
 
val startupLogWithEventTimeDStream: DataStream[StartupLog] = startupLogDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StartupLog](Time.seconds(0L)) {
    override def extractTimestamp(element: StartupLog): Long = {
      element.ts
    }
  }).setParallelism(1)

  //SparkSession
 
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

  //把数据流转化成Table
 
val startupTable: Table = tableEnv.fromDataStream(startupLogWithEventTimeDStream , ‘mid,‘uid,‘appid,‘area,‘os,‘ch,‘logType,‘vs,‘logDate,‘logHour,‘logHourMinute,‘ts.rowtime)

  //通过table api 进行操作
  // 每10秒 统计一次各个渠道的个数 table api 解决
  //1 groupby  2 要用 window   3 用eventtime来确定开窗时间
 
val resultTable: Table = startupTable.window(Tumble over 10000.millis on ‘ts as ‘tt).groupBy(‘ch,‘tt ).select( ‘ch, ‘ch.count)
 
 

  //把Table转化成数据流
  //val appstoreDStream: DataStream[(String, String, Long)] = appstoreTable.toAppendStream[(String,String,Long)]
 
val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)]

  resultDstream.filter(_._1).print()

  env.execute()

}   

关于group by

  1. 如果使用 groupby table转换为流的时候只能用toRetractDstream
  val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)]
  • toRetractDstream 得到的第一个boolean型字段标识 true就是最新的数据,false表示过期老数据
  val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)]   rDstream.filter(_._1).print()
  • 如果使用的api包括时间窗口,那么时间的字段必须,包含在group by中。
  val table: Table = startupLogTable.filter(“ch =’appstore'”).window(Tumble over 10000.millis on ‘ts as ‘tt).groupBy(‘ch ,‘tt).select(“ch,ch.count “)

关于时间窗口

  1. 用到时间窗口,必须提前声明时间字段,如果是processTime直接在创建动态表时进行追加就可以
val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,‘mid,‘uid,‘appid,‘area,‘os,‘ch,‘logType,‘vs,‘logDate,‘logHour,‘logHourMinute,‘ts.rowtime)
  • 如果是EventTime要在创建动态表时声明
val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,‘mid,‘uid,‘appid,‘area,‘os,‘ch,‘logType,‘vs,‘logDate,‘logHour,‘logHourMinute,‘ps.processtime)  
  • 滚动窗口可以使用Tumble over 10000.millis on
  val table: Table = startupLogTable.filter(“ch =’appstore'”).window(Tumble over 10000.millis on ‘ts as ‘tt).groupBy(‘ch ,‘tt).select(“ch,ch.count “)

4 SQL如何编写

def main(args: Array[String]): Unit = {
  //sparkcontext
 
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

 
//时间特性改为eventTime
 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer(“GMALL_STARTUP”)
  val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

  val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) }
  //告知watermark 和 eventTime如何提取
 
val startupLogWithEventTimeDStream: DataStream[StartupLog] = startupLogDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StartupLog](Time.seconds(0L)) {
    override def extractTimestamp(element: StartupLog): Long = {
      element.ts
    }
  }).setParallelism(1)

  //SparkSession
 
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

  //把数据流转化成Table
 
val startupTable: Table = tableEnv.fromDataStream(startupLogWithEventTimeDStream , ‘mid,‘uid,‘appid,‘area,‘os,‘ch,‘logType,‘vs,‘logDate,‘logHour,‘logHourMinute,‘ts.rowtime)

  //通过table api 进行操作
  // 每10秒 统计一次各个渠道的个数 table api 解决
  //1 groupby  2 要用 window   3 用eventtime来确定开窗时间
 
val resultTable: Table = startupTable.window(Tumble over 10000.millis on ‘ts as ‘tt).groupBy(‘ch,‘tt ).select( ‘ch, ‘ch.count)
 // 通过sql 进行操作

  val resultSQLTable : Table = tableEnv.sqlQuery( “select ch ,count(ch)   from “+startupTable+”  group by ch   ,Tumble(ts,interval ’10’ SECOND )”)

  //把Table转化成数据流
  //val appstoreDStream: DataStream[(String, String, Long)] = appstoreTable.toAppendStream[(String,String,Long)]
 
val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)]

  resultDstream.filter(_._1).print()

  env.execute()

}  

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷J大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训 UI/UE设计培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)