尚硅谷大数据技术之Hadoop(MapReduce)(新)第2章 Hadoop序列化
3.1.7 KeyValueTextInputFormat使用案例
1.需求
统计输入文件中每一行的第一个单词相同的行数。
(1)输入数据
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
(2)期望结果数据
banzhang 2
xihuan 2
2.需求分析
3.代码实现
(1)编写Mapper类
package com.atguigu.mapreduce.KeyValueTextInputFormat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable>{ // 1 设置value LongWritable v = new LongWritable(1);
@Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
// banzhang ni hao
// 2 写出 context.write(key, v); } } |
(2)编写Reducer类
package com.atguigu.mapreduce.KeyValueTextInputFormat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ LongWritable v = new LongWritable();
@Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0L;
// 1 汇总统计 for (LongWritable value : values) { sum += value.get(); }
v.set(sum);
// 2 输出 context.write(key, v); } } |
(3)编写Driver类
package com.atguigu.mapreduce.keyvaleTextInputFormat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class KVTextDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 设置切割符 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); // 1 获取job对象 Job job = Job.getInstance(conf); // 2 设置jar包位置,关联mapper和reducer job.setJarByClass(KVTextDriver.class); job.setMapperClass(KVTextMapper.class); job.setReducerClass(KVTextReducer.class); // 3 设置map输出kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
// 4 设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 5 设置输入输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); // 设置输入格式 job.setInputFormatClass(KeyValueTextInputFormat.class); // 6 设置输出数据路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交job job.waitForCompletion(true); } } |