尚硅谷大数据技术之Hadoop(MapReduce)(新)第2章 Hadoop序列化

3.1.9 自定义InputFormat

3.1.10 自定义InputFormat案例实操

无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。

1.需求

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

(1)输入数据

one.txt文本

yongpeng weidong weinan
sanfeng luozong xiaoming

two.txt文本

longlong fanfan
mazong kailun yuhang yixin
longlong fanfan
mazong kailun yuhang yixin

three.txt文本

shuaige changmo zhenqiang
dongli lingu xuanxuan

(2)期望输出文件格式

2.需求分析

3.程序实现

(1)自定义InputFromat

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

 

// 定义类继承FileInputFormat

public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{

@Override

protected boolean isSplitable(JobContext context, Path filename) {

return false;

}

 

@Override

public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

WholeRecordReader recordReader = new WholeRecordReader();

recordReader.initialize(split, context);

return recordReader;

}

}

(2)自定义RecordReader类

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class WholeRecordReader extends RecordReader<Text, BytesWritable>{

 

private Configuration configuration;

private FileSplit split;

private boolean isProgress= true;

private BytesWritable value = new BytesWritable();

private Text k = new Text();

 

@Override

public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

this.split = (FileSplit)split;

configuration = context.getConfiguration();

}

 

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

if (isProgress) {

 

// 1 定义缓存区

byte[] contents = new byte[(int)split.getLength()];

FileSystem fs = null;

FSDataInputStream fis = null;

try {

// 2 获取文件系统

Path path = split.getPath();

fs = path.getFileSystem(configuration);

// 3 读取数据

fis = fs.open(path);

// 4 读取文件内容

IOUtils.readFully(fis, contents, 0, contents.length);

// 5 输出文件内容

value.set(contents, 0, contents.length);

 

// 6 获取文件路径及名称

String name = split.getPath().toString();

 

// 7 设置输出的key值

k.set(name);

 

} catch (Exception e) {

}finally {

IOUtils.closeStream(fis);

}

isProgress = false;

return true;

}

return false;

}

 

@Override

public Text getCurrentKey() throws IOException, InterruptedException {

return k;

}

 

@Override

public BytesWritable getCurrentValue() throws IOException, InterruptedException {

return value;

}

 

@Override

public float getProgress() throws IOException, InterruptedException {

return 0;

}

 

@Override

public void close() throws IOException {

}

}

(3)编写SequenceFileMapper类处理流程

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{

@Override

protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {

 

context.write(key, value);

}

}

(4)编写SequenceFileReducer类处理流程

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

 

@Override

protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {

 

context.write(key, values.iterator().next());

}

}

(5)编写SequenceFileDriver类处理流程

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

 

public class SequenceFileDriver {

 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

       // 输入输出路径需要根据自己电脑上实际的输入输出路径设置

args = new String[] { "e:/input/inputinputformat", "e:/output1" };

 

       // 1 获取job对象

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

 

       // 2 设置jar包存储位置、关联自定义的mapper和reducer

job.setJarByClass(SequenceFileDriver.class);

job.setMapperClass(SequenceFileMapper.class);

job.setReducerClass(SequenceFileReducer.class);

 

       // 7设置输入的inputFormat

job.setInputFormatClass(WholeFileInputformat.class);

 

       // 8设置输出的outputFormat

 job.setOutputFormatClass(SequenceFileOutputFormat.class);

 

// 3 设置map输出端的kv类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(BytesWritable.class);

       // 4 设置最终输出端的kv类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(BytesWritable.class);

 

       // 5 设置输入输出路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

       // 6 提交job

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);

}

}