大数据培训之自定义组件Source

发布时间:2020年06月16日作者:atguigu浏览次数:1,180

自定义Source

Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。

官方也提供了自定义source的接口:

https://flume.apache.org/FlumeDeveloperGuide.html#source根据官方说明自定义MySource需要继承AbstractSource类并实现Configurable和PollableSource接口。

实现相应方法:

getBackOffSleepIncrement()//暂不用

getMaxBackOffSleepInterval()//暂不用

configure(Context context)//初始化context(读取配置文件内容)

process()//获取数据封装成event并写入channel,这个方法将被循环调用。

使用场景:读取MySQL数据或者其他文件系统。

1 案例需求分析

使用flume接收数据,并给每条数据添加前缀,输出到控制台。前缀可从flume配置文件中配置。

大数据培训
大数据培训

2 案例编码实现

package com.atguigu;

import org.apache.flume.Context;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.PollableSource;

import org.apache.flume.conf.Configurable;

import org.apache.flume.event.SimpleEvent;

import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    //定义配置文件将来要读取的字段

    private Long delay;

    private String field;

    //初始化配置信息

    @Override

    public void configure(Context context) {

        delay = context.getLong(“delay”);

        field = context.getString(“field”, “Hello!”);

    }

    @Override

    public Status process() throws EventDeliveryException {

        try {

            //创建事件头信息

            HashMap<String, String> hearderMap = new HashMap<>();

            //创建事件

            SimpleEvent event = new SimpleEvent();

            //循环封装事件

            for (int i = 0; i < 5; i++) {

                //给事件设置头信息

                event.setHeaders(hearderMap);

                //给事件设置内容

                event.setBody((field + i).getBytes());

                //将事件写入channel

                getChannelProcessor().processEvent(event);

                Thread.sleep(delay);

            }

        } catch (Exception e) {

            e.printStackTrace();

            return Status.BACKOFF;

        }

        return Status.READY;

    }

    @Override

    public long getBackOffSleepIncrement() {

        return 0;

    }

    @Override

    public long getMaxBackOffSleepInterval() {

        return 0;

    }

}

3 案例测试

1)打包

将写好的代码打包,并放到flume的lib目录(/opt/module/flume)下。

2)配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = com.atguigu.MySource

a1.sources.r1.delay = 1000

#a1.sources.r1.field = atguigu

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3)开启任务

[atguigu@hadoop102 flume]$ pwd

/opt/module/flume

[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

4)结果展示

大数据培训

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)