尚硅谷大数据技术之HBase(新)第6章 HBase API操作
6.3.3 自定义HBase-MapReduce2
目标:实现将HDFS中的数据写入到HBase表中。
分步实现:
1.构建ReadFruitFromHDFSMapper于读取HDFS中的文件数据
package com.atguigu;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //从HDFS中读取的数据 String lineValue = value.toString(); //读取出来的每行数据使用\t进行分割,存于String数组 String[] values = lineValue.split("\t"); //根据数据中值的含义取值 String rowKey = values[0]; String name = values[1]; String color = values[2]; //初始化rowKey ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey)); //初始化put对象 Put put = new Put(Bytes.toBytes(rowKey)); //参数分别:列族、列、值 put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)); put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
context.write(rowKeyWritable, put); } } |
2.构建WriteFruitMRFromTxtReducer类
package com.z.hbase.mr2;
import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //读出来的每一行数据写入到fruit_hdfs表中 for(Put put: values){ context.write(NullWritable.get(), put); } } } |
3.创建Txt2FruitRunner组装Job
public int run(String[] args) throws Exception { //得到Configuration Configuration conf = this.getConf();
//创建Job任务 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(Txt2FruitRunner.class); Path inPath = new Path("hdfs://hadoop102:9000/input_fruit/fruit.tsv"); FileInputFormat.addInputPath(job, inPath);
//设置Mapper job.setMapperClass(ReadFruitFromHDFSMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class);
//设置Reducer TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRFromTxtReducer.class, job);
//设置Reduce数量,最少1个 job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true); if(!isSuccess){ throw new IOException("Job running with error"); }
return isSuccess ? 0 : 1; } |
4.调用执行Job
public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int status = ToolRunner.run(conf, new Txt2FruitRunner(), args); System.exit(status); } |
5.打包运行
$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.atguigu.hbase.mr2.Txt2FruitRunner
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之。
提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)