尚硅谷大数据技术之电信客服
4) 创建类:MySQLOutputFormat
package com.atguigu.analysis.format; import com.atguigu.analysis.converter.impl.DimensionConverter; import com.atguigu.analysis.kv.base.BaseDimension; import com.atguigu.analysis.kv.base.BaseValue; import com.atguigu.analysis.kv.impl.ComDimension; import com.atguigu.analysis.kv.impl.CountDurationValue; import com.atguigu.constants.Constants; import com.atguigu.utils.JDBCCacheBean; import com.atguigu.utils.JDBCUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; public class MySQLOutputFormat extends OutputFormat<BaseDimension, BaseValue> { @Override public RecordWriter<BaseDimension, BaseValue> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //创建jdbc连接 Connection conn = null; try { conn = JDBCCacheBean.getInstance(); //关闭自动提交,以便于批量提交 conn.setAutoCommit(false); } catch (SQLException e) { throw new IOException(e); } return new MysqlRecordWriter(conn); } @Override public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { // 校检输出 } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { String name = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR); Path output = name == null ? null : new Path(name); return new FileOutputCommitter(output, taskAttemptContext); } static class MysqlRecordWriter extends RecordWriter<BaseDimension, BaseValue> { private Connection conn = null; private DimensionConverter dimensionConverter = null; private PreparedStatement preparedStatement = null; private int batchNumber = 0; int count = 0; public MysqlRecordWriter(Connection conn) { this.conn = conn; this.batchNumber = Constants.JDBC_DEFAULT_BATCH_NUMBER; this.dimensionConverter = new DimensionConverter(); } @Override public void write(BaseDimension key, BaseValue value) throws IOException, InterruptedException { try { // 统计当前PreparedStatement对象待提交的数据量 String sql = "INSERT INTO `tb_call`(`id_date_contact`, `id_date_dimension`, `id_contact`, `call_sum`, `call_duration_sum`) VALUES(?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE `id_date_contact` = ? ;"; if (preparedStatement == null) { preparedStatement = conn.prepareStatement(sql); } // 本次sql int i = 0; ComDimension comDimension = (ComDimension) key; CountDurationValue countDurationValue = (CountDurationValue) value; int id_date_dimension = dimensionConverter.getDimensionId(comDimension.getDateDimension()); int id_contact = dimensionConverter.getDimensionId(comDimension.getContactDimension()); int call_sum = countDurationValue.getCallSum(); int call_duration_sum = countDurationValue.getCallDurationSum(); String id_date_contact = id_date_dimension + "_" + id_contact; preparedStatement.setString(++i, id_date_contact); preparedStatement.setInt(++i, id_date_dimension); preparedStatement.setInt(++i, id_contact); preparedStatement.setInt(++i, call_sum); preparedStatement.setInt(++i, call_duration_sum); preparedStatement.setString(++i, id_date_contact); preparedStatement.addBatch(); //当前缓存了多少个sql语句等待批量执行,计数器 count++; // 批量提交 if (count >= this.batchNumber) { preparedStatement.executeBatch(); // 批量提交 conn.commit(); // 连接提交 count = 0; } } catch (SQLException e) { e.printStackTrace(); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { try { preparedStatement.executeBatch(); this.conn.commit(); } catch (SQLException e) { e.printStackTrace(); }finally { JDBCUtil.close(conn, preparedStatement, null); } } } } |
5) 创建类:BaseDimension
package com.atguigu.analysis.kv.base; import org.apache.hadoop.io.WritableComparable; public abstract class BaseDimension implements WritableComparable<BaseDimension> {} |
6) 创建类:BaseValue
package com.atguigu.analysis.kv.base; import org.apache.hadoop.io.Writable; public abstract class BaseValue implements Writable { } |
7) 创建类:ComDimension
package com.atguigu.analysis.kv.impl; import com.atguigu.analysis.kv.base.BaseDimension; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class ComDimension extends BaseDimension{ //时间维度 private DateDimension dateDimension = new DateDimension(); //联系人维度 private ContactDimension contactDimension = new ContactDimension(); public ComDimension() { super(); } public ComDimension(DateDimension dateDimension, ContactDimension contactDimension) { super(); this.dateDimension = dateDimension; this.contactDimension = contactDimension; } public DateDimension getDateDimension() { return dateDimension; } public void setDateDimension(DateDimension dateDimension) { this.dateDimension = dateDimension; } public ContactDimension getContactDimension() { return contactDimension; } public void setContactDimension(ContactDimension contactDimension) { this.contactDimension = contactDimension; } @Override public int compareTo(BaseDimension o) { if(this == o) return 0; ComDimension comDimension = (ComDimension) o; int tmp = this.dateDimension.compareTo(comDimension.getDateDimension()); if(tmp != 0) return tmp; tmp = this.contactDimension.compareTo(comDimension.getContactDimension()); return tmp; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ComDimension that = (ComDimension) o; if (dateDimension != null ? !dateDimension.equals(that.dateDimension) : that.dateDimension != null) return false; return contactDimension != null ? contactDimension.equals(that.contactDimension) : that.contactDimension == null; } @Override public int hashCode() { int result = dateDimension != null ? dateDimension.hashCode() : 0; result = 31 * result + (contactDimension != null ? contactDimension.hashCode() : 0); return result; } @Override public void write(DataOutput dataOutput) throws IOException { this.dateDimension.write(dataOutput); this.contactDimension.write(dataOutput); } @Override public void readFields(DataInput dataInput) throws IOException { this.dateDimension.readFields(dataInput); this.contactDimension.readFields(dataInput); } } |