Dstream创建自定义数据源_大数据培训

发布时间:2021年09月02日作者:atguigu浏览次数:361

自定义数据源

1 用法及说明

需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

大数据培训

2 案例实操

1)需求:自定义数据源,实现监控某个端口号,获取该端口号内容。

2)自定义数据源

package com.atguigu

import java.io.{BufferedReader, InputStreamReader}

import java.net.Socket

import java.nio.charset.StandardCharsets

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.receiver.Receiver

class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

  //最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark

  override def onStart(): Unit = {

    new Thread(“Socket Receiver”) {

      override def run() {

        receive()

      }

    }.start()

  }

  //读数据并将数据发送给Spark

  def receive(): Unit = {

    //创建一个Socket

    var socket: Socket = new Socket(host, port)

    //定义一个变量,用来接收端口传过来的数据

    var input: String = null

    //创建一个BufferedReader用于读取端口传来的数据

    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))

    //读取数据

    input = reader.readLine()

    //当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark

    while (!isStopped() && input != null) {

      store(input)

      input = reader.readLine()

    }

    //跳出循环则关闭资源

    reader.close()

    socket.close()

    //重启任务

    restart(“restart”)

  }

  override def onStop(): Unit = {}

}

3)使用自定义的数据源采集数据

package com.atguigu

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.DStream

object FileStream {

  def main(args: Array[String]): Unit = {

    //1.初始化Spark配置信息

val sparkConf = new SparkConf().setMaster(“local[*]”)

.setAppName(“StreamWordCount”)

    //2.初始化SparkStreamingContext

    val ssc = new StreamingContext(sparkConf, Seconds(5))

//3.创建自定义receiver的Streaming

val lineStream = ssc.receiverStream(new CustomerReceiver(“hadoop102”, 9999))

    //4.将每一行数据做切分,形成一个个单词

    val wordStream = lineStream.flatMap(_.split(“\t”))

    //5.将单词映射成元组(word,1)

    val wordAndOneStream = wordStream.map((_, 1))

    //6.将相同的单词次数做统计

    val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)

    //7.打印

    wordAndCountStream.print()

    //8.启动SparkStreamingContext

    ssc.start()

    ssc.awaitTermination()

  }

  }

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训 UI/UE设计培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)