尚硅谷大数据技术之Flume(新)第5章 Flume高级之自定义MySQLSource

5.4.4 MySQLSource


package com.atguigu.source;


import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.PollableSource;

import org.apache.flume.conf.Configurable;

import org.apache.flume.event.SimpleEvent;

import org.apache.flume.source.AbstractSource;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


import java.text.ParseException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;


public class SQLSource extends AbstractSource implements Configurable, PollableSource {



private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);



    private SQLSourceHelper sqlSourceHelper;




    public long getBackOffSleepIncrement() {

        return 0;




    public long getMaxBackOffSleepInterval() {

        return 0;




public void configure(Context context) {


        try {


            sqlSourceHelper = new SQLSourceHelper(context);

        } catch (ParseException e) {






public Status process() throws EventDeliveryException {


        try {


            List<List<Object>> result = sqlSourceHelper.executeQuery();



            List<Event> events = new ArrayList<>();



            HashMap<String, String> header = new HashMap<>();



            if (!result.isEmpty()) {


                List<String> allRows = sqlSourceHelper.getAllRows(result);


                Event event = null;


                for (String row : allRows) {

                    event = new SimpleEvent();
















            return Status.READY;

        } catch (InterruptedException e) {

            LOG.error("Error procesing row", e);


            return Status.BACKOFF;





public synchronized void stop() {


        LOG.info("Stopping sql source {} ...", getName());


        try {



        } finally {





5.5 测试

5.5.1 Jar包准备

1) MySql驱动包放入Flume的lib目录下

[atguigu@hadoop102 flume]$ cp \

/opt/sorfware/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar \


2) 打包项目并将Jar包放入Flume的lib目录下

5.5.2 配置文件准备


[atguigu@hadoop102 job]$ touch mysql.conf

[atguigu@hadoop102 job]$ vim mysql.conf


# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = com.atguigu.source.SQLSource  

a1.sources.r1.connection.url = jdbc:mysql://

a1.sources.r1.connection.user = root  

a1.sources.r1.connection.password = 000000  

a1.sources.r1.table = student  

a1.sources.r1.columns.to.select = *  

#a1.sources.r1.incremental.column.name = id  

#a1.sources.r1.incremental.value = 0



# Describe the sink

a1.sinks.k1.type = logger


# Describe the channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1