批处理wordcount
def main(args: Array[String]): Unit = { //构造执行环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //读取文件 val input = “file:///d:/temp/hello.txt” val ds: DataSet[String] = env.readTextFile(input) // 其中flatMap 和Map 中 需要引入隐式转换 import org.apache.flink.api.scala.createTypeInformation //经过groupby进行分组,sum进行聚合 val aggDs: AggregateDataSet[(String, Int)] = ds.flatMap(_.split(” “)).map((_, 1)).groupBy(0).sum(1) // 打印 aggDs.print() } |
注意:Flink程序支持java 和 scala两种语言,本课程中以scala语言为主。
在引入包中,有java和scala两种包时注意要使用scala的包
流处理 wordcount
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object StreamWcApp { def main(args: Array[String]): Unit = { //从外部命令中获取参数 val tool: ParameterTool = ParameterTool.fromArgs(args) val host: String = tool.get(“host”) val port: Int = tool.get(“port”).toInt //创建流处理环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //接收socket文本流 val textDstream: DataStream[String] = env.socketTextStream(host,port) // flatMap和Map需要引用的隐式转换 import org.apache.flink.api.scala._ //处理 分组并且sum聚合 val dStream: DataStream[(String, Int)] = textDstream.flatMap(_.split(” “)).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1) //打印 dStream.print() env.execute() } |
测试
在linux系统中用
nc -lk 7777 |
进行发送测试
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习
上一篇: Java培训课程Redis之哨兵模式
下一篇: 前端培训之事件的各个阶段