尚硅谷大数据技术之Flume(新)第3章 企业开发案例

  1. 4. 查看HDFS上的数据
    1. 5. 等待1s,再次查询upload文件夹

    [atguigu@hadoop102 upload]$ ll

    总用量 0

    -rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.log.COMPLETED

    -rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.tmp

    -rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.txt.COMPLETED

    3.4 单数据源多出口案例(选择器)

    单Source多Channel、Sink如图7-2所示。

  2. 1)案例需求:使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。

    2)需求分析:

  3. 3)实现步骤:

    0.准备工作

    在/opt/module/flume/job目录下创建group1文件夹

    [atguigu@hadoop102 job]$ cd group1/

    在/opt/module/datas/目录下创建flume3文件夹

    [atguigu@hadoop102 datas]$ mkdir flume3

    1.创建flume-file-flume.conf

    配置1个接收日志文件的source和两个channel、两个sink,分别输送给flume-flume-hdfs和flume-flume-dir。

    创建配置文件并打开

    [atguigu@hadoop102 group1]$ touch flume-file-flume.conf

    [atguigu@hadoop102 group1]$ vim flume-file-flume.conf

    添加如下内容

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1 k2

    a1.channels = c1 c2

    # 将数据流复制给所有channel

    a1.sources.r1.selector.type = replicating

     

    # Describe/configure the source

    a1.sources.r1.type = exec

    a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log

    a1.sources.r1.shell = /bin/bash -c

     

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = hadoop102 

    a1.sinks.k1.port = 4141

     

    a1.sinks.k2.type = avro

    a1.sinks.k2.hostname = hadoop102

    a1.sinks.k2.port = 4142

     

    # Describe the channel

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    a1.channels.c2.type = memory

    a1.channels.c2.capacity = 1000

    a1.channels.c2.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1 c2

    a1.sinks.k1.channel = c1

    a1.sinks.k2.channel = c2

    注:Avro是由Hadoop创始人Doug Cutting创建的一种语言无关的数据序列化和RPC框架。

    注:RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

    2.创建flume-flume-hdfs.conf

    配置上级Flume输出的Source,输出是到HDFS的Sink。

    创建配置文件并打开

    [atguigu@hadoop102 group1]$ touch flume-flume-hdfs.conf

    [atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf

    添加如下内容

    # Name the components on this agent

    a2.sources = r1

    a2.sinks = k1

    a2.channels = c1

     

    # Describe/configure the source

    a2.sources.r1.type = avro

    a2.sources.r1.bind = hadoop102

    a2.sources.r1.port = 4141

     

    # Describe the sink

    a2.sinks.k1.type = hdfs

    a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H

    #上传文件的前缀

    a2.sinks.k1.hdfs.filePrefix = flume2-

    #是否按照时间滚动文件夹

    a2.sinks.k1.hdfs.round = true

    #多少时间单位创建一个新的文件夹

    a2.sinks.k1.hdfs.roundValue = 1

    #重新定义时间单位

    a2.sinks.k1.hdfs.roundUnit = hour

    #是否使用本地时间戳

    a2.sinks.k1.hdfs.useLocalTimeStamp = true

    #积攒多少个Event才flush到HDFS一次

    a2.sinks.k1.hdfs.batchSize = 100

    #设置文件类型,可支持压缩

    a2.sinks.k1.hdfs.fileType = DataStream

    #多久生成一个新的文件

    a2.sinks.k1.hdfs.rollInterval = 600

    #设置每个文件的滚动大小大概是128M

    a2.sinks.k1.hdfs.rollSize = 134217700

    #文件的滚动与Event数量无关

    a2.sinks.k1.hdfs.rollCount = 0

    #最小冗余数

    a2.sinks.k1.hdfs.minBlockReplicas = 1

     

    # Describe the channel

    a2.channels.c1.type = memory

    a2.channels.c1.capacity = 1000

    a2.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a2.sources.r1.channels = c1

    a2.sinks.k1.channel = c1