尚硅谷大数据技术之Hadoop(MapReduce)(新)第3章 MapReduce框架原理

3.8 计数器应用

3.9 数据清洗(ETL)

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

3.9.1 数据清洗案例实操-简单解析版

1.需求

去除日志中字段长度小于等于11的日志。

(1)输入数据

web.log

(2)期望输出数据

每行字段长度都大于11。

2.需求分析

需要在Map阶段对输入的数据根据规则进行过滤清洗。

3.实现代码

(1)编写LogMapper类

package com.atguigu.mapreduce.weblog;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

Text k = new Text();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 1 获取1行数据

String line = value.toString();

// 2 解析日志

boolean result = parseLog(line,context);

// 3 日志不合法退出

if (!result) {

return;

}

// 4 设置key

k.set(line);

// 5 写出数据

context.write(k, NullWritable.get());

}

 

// 2 解析日志

private boolean parseLog(String line, Context context) {

 

// 1 截取

String[] fields = line.split(" ");

// 2 日志长度大于11的为合法

if (fields.length > 11) {

 

// 系统计数器

context.getCounter("map", "true").increment(1);

return true;

}else {

context.getCounter("map", "false").increment(1);

return false;

}

}

}

 

(2)编写LogDriver类

package com.atguigu.mapreduce.weblog;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class LogDriver {

 

public static void main(String[] args) throws Exception {

 

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置

        args = new String[] { "e:/input/inputlog", "e:/output1" };

 

// 1 获取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

 

// 2 加载jar包

job.setJarByClass(LogDriver.class);

 

// 3 关联map

job.setMapperClass(LogMapper.class);

 

// 4 设置最终输出类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

 

// 设置reducetask个数为0

job.setNumReduceTasks(0);

 

// 5 设置输入和输出路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 6 提交

job.waitForCompletion(true);

}

}