尚硅谷大数据技术之Hadoop(MapReduce)(新)第2章 Hadoop序列化

3.1.7 KeyValueTextInputFormat使用案例

1.需求

统计输入文件中每一行的第一个单词相同的行数。

(1)输入数据

banzhang ni hao

xihuan hadoop banzhang

banzhang ni hao

xihuan hadoop banzhang

(2)期望结果数据

banzhang 2

xihuan 2

2.需求分析

3.代码实现

(1)编写Mapper类

package com.atguigu.mapreduce.KeyValueTextInputFormat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable>{

// 1 设置value

   LongWritable v = new LongWritable(1);  

 

@Override

protected void map(Text key, Text value, Context context)

throws IOException, InterruptedException {

 

// banzhang ni hao

 

        // 2 写出

        context.write(key, v);  

}

}

(2)编写Reducer类

package com.atguigu.mapreduce.KeyValueTextInputFormat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    LongWritable v = new LongWritable();  

 

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

 long sum = 0L;  

 

 // 1 汇总统计

        for (LongWritable value : values) {  

            sum += value.get();  

        }

 

        v.set(sum);  

 

        // 2 输出

        context.write(key, v);  

}

}

(3)编写Driver类

package com.atguigu.mapreduce.keyvaleTextInputFormat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class KVTextDriver {

 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();

// 设置切割符

conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");

// 1 获取job对象

Job job = Job.getInstance(conf);

// 2 设置jar包位置,关联mapper和reducer

job.setJarByClass(KVTextDriver.class);

job.setMapperClass(KVTextMapper.class);

job.setReducerClass(KVTextReducer.class);

// 3 设置map输出kv类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);

 

// 4 设置最终输出kv类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

// 5 设置输入输出数据路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

// 设置输入格式

job.setInputFormatClass(KeyValueTextInputFormat.class);

// 6 设置输出数据路径

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交job

job.waitForCompletion(true);

}

}