尚硅谷大数据技术之Flume(新)第5章 Flume高级之自定义MySQLSource

5.4.4 MySQLSource

代码实现:

package com.atguigu.source;

 

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.PollableSource;

import org.apache.flume.conf.Configurable;

import org.apache.flume.event.SimpleEvent;

import org.apache.flume.source.AbstractSource;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import java.text.ParseException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

 

public class SQLSource extends AbstractSource implements Configurable, PollableSource {

 

    //打印日志

private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);

 

    //定义sqlHelper

    private SQLSourceHelper sqlSourceHelper;

 

 

    @Override

    public long getBackOffSleepIncrement() {

        return 0;

    }

 

    @Override

    public long getMaxBackOffSleepInterval() {

        return 0;

    }

 

    @Override

public void configure(Context context) {

 

        try {

            //初始化

            sqlSourceHelper = new SQLSourceHelper(context);

        } catch (ParseException e) {

            e.printStackTrace();

        }

    }

 

    @Override

public Status process() throws EventDeliveryException {

 

        try {

            //查询数据表

            List<List<Object>> result = sqlSourceHelper.executeQuery();

 

            //存放event的集合

            List<Event> events = new ArrayList<>();

 

            //存放event头集合

            HashMap<String, String> header = new HashMap<>();

 

            //如果有返回数据,则将数据封装为event

            if (!result.isEmpty()) {

 

                List<String> allRows = sqlSourceHelper.getAllRows(result);

 

                Event event = null;

 

                for (String row : allRows) {

                    event = new SimpleEvent();

                    event.setBody(row.getBytes());

                    event.setHeaders(header);

                    events.add(event);

                }

 

                //将event写入channel

                this.getChannelProcessor().processEventBatch(events);

 

                //更新数据表中的offset信息

                sqlSourceHelper.updateOffset2DB(result.size());

            }

 

            //等待时长

            Thread.sleep(sqlSourceHelper.getRunQueryDelay());

 

            return Status.READY;

        } catch (InterruptedException e) {

            LOG.error("Error procesing row", e);

 

            return Status.BACKOFF;

        }

    }

 

    @Override

public synchronized void stop() {

 

        LOG.info("Stopping sql source {} ...", getName());

 

        try {

            //关闭资源

            sqlSourceHelper.close();

        } finally {

            super.stop();

        }

    }

}

5.5 测试

5.5.1 Jar包准备

1) MySql驱动包放入Flume的lib目录下

[atguigu@hadoop102 flume]$ cp \

/opt/sorfware/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar \

/opt/module/flume/lib/

2) 打包项目并将Jar包放入Flume的lib目录下

5.5.2 配置文件准备

1)创建配置文件并打开

[atguigu@hadoop102 job]$ touch mysql.conf

[atguigu@hadoop102 job]$ vim mysql.conf

2)添加如下内容

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type = com.atguigu.source.SQLSource  

a1.sources.r1.connection.url = jdbc:mysql://192.168.9.102:3306/mysqlsource

a1.sources.r1.connection.user = root  

a1.sources.r1.connection.password = 000000  

a1.sources.r1.table = student  

a1.sources.r1.columns.to.select = *  

#a1.sources.r1.incremental.column.name = id  

#a1.sources.r1.incremental.value = 0

a1.sources.r1.run.query.delay=5000

 

# Describe the sink

a1.sinks.k1.type = logger

 

# Describe the channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1