尚硅谷大数据技术之电信客服

4) 创建类:MySQLOutputFormat

package com.atguigu.analysis.format;

import com.atguigu.analysis.converter.impl.DimensionConverter;

import com.atguigu.analysis.kv.base.BaseDimension;

import com.atguigu.analysis.kv.base.BaseValue;

import com.atguigu.analysis.kv.impl.ComDimension;

import com.atguigu.analysis.kv.impl.CountDurationValue;

import com.atguigu.constants.Constants;

import com.atguigu.utils.JDBCCacheBean;

import com.atguigu.utils.JDBCUtil;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.SQLException;

public class MySQLOutputFormat extends OutputFormat<BaseDimension, BaseValue> {

    @Override

    public RecordWriter<BaseDimension, BaseValue> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        //创建jdbc连接

        Connection conn = null;

        try {

            conn = JDBCCacheBean.getInstance();

            //关闭自动提交,以便于批量提交

            conn.setAutoCommit(false);

        } catch (SQLException e) {

            throw new IOException(e);

        }

        return new MysqlRecordWriter(conn);

    }

    @Override

    public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {

        // 校检输出

    }

    @Override

    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        String name = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR);

        Path output = name == null ? null : new Path(name);

        return new FileOutputCommitter(output, taskAttemptContext);

    }

    static class MysqlRecordWriter extends RecordWriter<BaseDimension, BaseValue> {

        private Connection conn = null;

        private DimensionConverter dimensionConverter = null;

        private PreparedStatement preparedStatement = null;

        private int batchNumber = 0;

        int count = 0;

        public MysqlRecordWriter(Connection conn) {

            this.conn = conn;

            this.batchNumber = Constants.JDBC_DEFAULT_BATCH_NUMBER;

            this.dimensionConverter = new DimensionConverter();

        }

        @Override

        public void write(BaseDimension key, BaseValue value) throws IOException, InterruptedException {

            try {

                // 统计当前PreparedStatement对象待提交的数据量

                String sql = "INSERT INTO `tb_call`(`id_date_contact`, `id_date_dimension`, `id_contact`, `call_sum`, `call_duration_sum`) VALUES(?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE `id_date_contact` = ? ;";

                if (preparedStatement == null) {

                    preparedStatement = conn.prepareStatement(sql);

                }

                // 本次sql

                int i = 0;

                ComDimension comDimension = (ComDimension) key;

                CountDurationValue countDurationValue = (CountDurationValue) value;

                int id_date_dimension = dimensionConverter.getDimensionId(comDimension.getDateDimension());

                int id_contact = dimensionConverter.getDimensionId(comDimension.getContactDimension());

                int call_sum = countDurationValue.getCallSum();

                int call_duration_sum = countDurationValue.getCallDurationSum();

                String id_date_contact = id_date_dimension + "_" + id_contact;

                preparedStatement.setString(++i, id_date_contact);

                preparedStatement.setInt(++i, id_date_dimension);

                preparedStatement.setInt(++i, id_contact);

                preparedStatement.setInt(++i, call_sum);

                preparedStatement.setInt(++i, call_duration_sum);

                preparedStatement.setString(++i, id_date_contact);

                preparedStatement.addBatch();

                //当前缓存了多少个sql语句等待批量执行,计数器

                count++;

                // 批量提交

                if (count >= this.batchNumber) {

                    preparedStatement.executeBatch(); // 批量提交

                    conn.commit(); // 连接提交

                    count = 0;

                }

            } catch (SQLException e) {

                e.printStackTrace();

            }

        }

        @Override

        public void close(TaskAttemptContext context) throws IOException, InterruptedException {

            try {

                preparedStatement.executeBatch();

                this.conn.commit();

            } catch (SQLException e) {

                e.printStackTrace();

            }finally {

                JDBCUtil.close(conn, preparedStatement, null);

            }

        }

    }

}

5) 创建类:BaseDimension

package com.atguigu.analysis.kv.base;

import org.apache.hadoop.io.WritableComparable;

public abstract class BaseDimension implements WritableComparable<BaseDimension> {}

6) 创建类:BaseValue

package com.atguigu.analysis.kv.base;

import org.apache.hadoop.io.Writable;

public abstract class BaseValue implements Writable { }

7) 创建类:ComDimension

package com.atguigu.analysis.kv.impl;

import com.atguigu.analysis.kv.base.BaseDimension;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class ComDimension extends BaseDimension{

    //时间维度

    private DateDimension dateDimension = new DateDimension();

    //联系人维度

    private ContactDimension contactDimension = new ContactDimension();

    public ComDimension() {

        super();

    }

    public ComDimension(DateDimension dateDimension, ContactDimension contactDimension) {

        super();

        this.dateDimension = dateDimension;

        this.contactDimension = contactDimension;

    }

    public DateDimension getDateDimension() {

        return dateDimension;

    }

    public void setDateDimension(DateDimension dateDimension) {

        this.dateDimension = dateDimension;

    }

    public ContactDimension getContactDimension() {

        return contactDimension;

    }

    public void setContactDimension(ContactDimension contactDimension) {

        this.contactDimension = contactDimension;

    }

    @Override

    public int compareTo(BaseDimension o) {

        if(this == o) return 0;

        ComDimension comDimension = (ComDimension) o;

        int tmp = this.dateDimension.compareTo(comDimension.getDateDimension());

        if(tmp != 0) return tmp;

        tmp = this.contactDimension.compareTo(comDimension.getContactDimension());

        return tmp;

    }

    @Override

    public boolean equals(Object o) {

        if (this == o) return true;

        if (o == null || getClass() != o.getClass()) return false;

        ComDimension that = (ComDimension) o;

        if (dateDimension != null ? !dateDimension.equals(that.dateDimension) : that.dateDimension != null) return false;

        return contactDimension != null ? contactDimension.equals(that.contactDimension) : that.contactDimension == null;

    }

    @Override

    public int hashCode() {

        int result = dateDimension != null ? dateDimension.hashCode() : 0;

        result = 31 * result + (contactDimension != null ? contactDimension.hashCode() : 0);

        return result;

    }

    @Override

    public void write(DataOutput dataOutput) throws IOException {

        this.dateDimension.write(dataOutput);

        this.contactDimension.write(dataOutput);

    }

    @Override

    public void readFields(DataInput dataInput) throws IOException {

        this.dateDimension.readFields(dataInput);

        this.contactDimension.readFields(dataInput);

    }

}