尚硅谷大数据技术之HBase(新)第6章 HBase API操作

6.3.3 自定义HBase-MapReduce2

目标:实现将HDFS中的数据写入到HBase表中。

分步实现:

1.构建ReadFruitFromHDFSMapper于读取HDFS中的文件数据

package com.atguigu;

 

import java.io.IOException;

 

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//从HDFS中读取的数据

String lineValue = value.toString();

//读取出来的每行数据使用\t进行分割,存于String数组

String[] values = lineValue.split("\t");

//根据数据中值的含义取值

String rowKey = values[0];

String name = values[1];

String color = values[2];

//初始化rowKey

ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));

//初始化put对象

Put put = new Put(Bytes.toBytes(rowKey));

//参数分别:列族、列、值  

        put.add(Bytes.toBytes("info"), Bytes.toBytes("name"),  Bytes.toBytes(name));

        put.add(Bytes.toBytes("info"), Bytes.toBytes("color"),  Bytes.toBytes(color));

 

        context.write(rowKeyWritable, put);

}

}

2.构建WriteFruitMRFromTxtReducer类

package com.z.hbase.mr2;

 

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.io.NullWritable;

 

public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

@Override

protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {

//读出来的每一行数据写入到fruit_hdfs表中

for(Put put: values){

context.write(NullWritable.get(), put);

}

}

}

3.创建Txt2FruitRunner组装Job

public int run(String[] args) throws Exception {

//得到Configuration

Configuration conf = this.getConf();

 

//创建Job任务

Job job = Job.getInstance(conf, this.getClass().getSimpleName());

job.setJarByClass(Txt2FruitRunner.class);

Path inPath = new Path("hdfs://hadoop102:9000/input_fruit/fruit.tsv");

FileInputFormat.addInputPath(job, inPath);

 

//设置Mapper

job.setMapperClass(ReadFruitFromHDFSMapper.class);

job.setMapOutputKeyClass(ImmutableBytesWritable.class);

job.setMapOutputValueClass(Put.class);

 

//设置Reducer

TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRFromTxtReducer.class, job);

 

//设置Reduce数量,最少1个

job.setNumReduceTasks(1);

 

boolean isSuccess = job.waitForCompletion(true);

if(!isSuccess){

throw new IOException("Job running with error");

}

 

return isSuccess ? 0 : 1;

}

4.调用执行Job

public static void main(String[] args) throws Exception {

Configuration conf = HBaseConfiguration.create();

    int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);

    System.exit(status);

}

5.打包运行

$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.atguigu.hbase.mr2.Txt2FruitRunner

提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之。

提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)