Kafka数据源(面试开发重点)_大数据培训

发布时间:2021年09月03日作者:atguigu浏览次数:213

1 用法及说明

在工程中需要引入Maven工件spark-streaming-kafka-0-8_2.11来使用它。包内提供的 KafkaUtils对象可以在StreamingContext和JavaStreamingContext中以你的Kafka消息创建出 DStream。

两个核心类:KafkaUtils、KafkaCluster

2 案例实操

1)需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算(WordCount),最终打印到控制台。

(1)导入依赖

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>

    <version>2.1.1</version>

</dependency>

(2)编写代码

import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

import org.apache.kafka.clients.consumer.ConsumerConfig

import org.apache.spark.SparkConf

import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import org.apache.spark.streaming.kafka.KafkaCluster.Err

import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}

import org.apache.spark.streaming.{Seconds, StreamingContext}

 

import scala.collection.mutable

 

object KafkaStreaming {

 

  def main(args: Array[String]): Unit = {

 

    //创建SparkConf对象

    val sparkConf: SparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“KafkaStreaming”)

 

    //创建StreamingContext对象

    val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))

 

    //kafka参数声明

    val brokers = “hadoop102:9092,hadoop103:9092,hadoop104:9092”

    val topic = “first”

    val group = “bigdata”

    val deserialization = “org.apache.kafka.common.serialization.StringDeserializer”

 

    //定义Kafka参数

    val kafkaPara: Map[String, String] = Map[String, String](

      ConsumerConfig.GROUP_ID_CONFIG -> group,

      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,

      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,

      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization

    )

 

    //创建KafkaCluster(维护offset)

    val kafkaCluster = new KafkaCluster(kafkaPara)

 

    //获取ZK中保存的offset

    val fromOffset: Map[TopicAndPartition, Long] = getOffsetFromZookeeper(kafkaCluster, group, Set(topic))

 

    //读取kafka数据创建DStream

    val kafkaDStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc,

      kafkaPara,

      fromOffset,

      (x: MessageAndMetadata[String, String]) => x.message())

 

    //数据处理

    kafkaDStream.print

 

    //提交offset

    offsetToZookeeper(kafkaDStream, kafkaCluster, group)

 

    ssc.start()

    ssc.awaitTermination()

  }

 

  //从ZK获取offset

  def getOffsetFromZookeeper(kafkaCluster: KafkaCluster, kafkaGroup: String, kafkaTopicSet: Set[String]): Map[TopicAndPartition, Long] = {

 

    // 创建Map存储Topic和分区对应的offset

    val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]()

 

    // 获取传入的Topic的所有分区

    // Either[Err, Set[TopicAndPartition]]  : Left(Err)   Right[Set[TopicAndPartition]]

    val topicAndPartitions: Either[Err, Set[TopicAndPartition]] = kafkaCluster.getPartitions(kafkaTopicSet)

 

    // 如果成功获取到Topic所有分区

    // topicAndPartitions: Set[TopicAndPartition]

    if (topicAndPartitions.isRight) {

      // 获取分区数据

      // partitions: Set[TopicAndPartition]

      val partitions: Set[TopicAndPartition] = topicAndPartitions.right.get

 

      // 获取指定分区的offset

      // offsetInfo: Either[Err, Map[TopicAndPartition, Long]]

      // Left[Err]  Right[Map[TopicAndPartition, Long]]

      val offsetInfo: Either[Err, Map[TopicAndPartition, Long]] = kafkaCluster.getConsumerOffsets(kafkaGroup, partitions)

 

      if (offsetInfo.isLeft) {

 

        // 如果没有offset信息则存储0

        // partitions: Set[TopicAndPartition]

        for (top <- partitions)

          topicPartitionOffsetMap += (top -> 0L)

      } else {

 

        // 如果有offset信息则存储offset

        // offsets: Map[TopicAndPartition, Long]

        val offsets: Map[TopicAndPartition, Long] = offsetInfo.right.get

        for ((top, offset) <- offsets)

          topicPartitionOffsetMap += (top -> offset)

      }

    }

    topicPartitionOffsetMap.toMap

  }

 

  //提交offset

  def offsetToZookeeper(kafkaDstream: InputDStream[String], kafkaCluster: KafkaCluster, kafka_group: String): Unit = {

    kafkaDstream.foreachRDD {

      rdd =>

        // 获取DStream中的offset信息

        // offsetsList: Array[OffsetRange]

        // OffsetRange: topic partition fromoffset untiloffset

        val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

 

        // 遍历每一个offset信息,并更新Zookeeper中的元数据

        // OffsetRange: topic partition fromoffset untiloffset

        for (offsets <- offsetsList) {

          val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)

          // ack: Either[Err, Map[TopicAndPartition, Short]]

          // Left[Err]

          // Right[Map[TopicAndPartition, Short]]

          val ack: Either[Err, Map[TopicAndPartition, Short]] = kafkaCluster.setConsumerOffsets(kafka_group, Map((topicAndPartition, offsets.untilOffset)))

          if (ack.isLeft) {

            println(s”Error updating the offset to Kafka cluster: ${ack.left.get}”)

          } else {

            println(s”update the offset to Kafka cluster: ${offsets.untilOffset} successfully”)

          }

        }

    }

  }

}

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训 UI/UE设计培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦6层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)