大数据培训课程自定义OutputFormat案例实操

发布时间:2020年09月15日作者:atguigu浏览次数:745

自定义OutputFormat案例实操

1.需求

       过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

大数据培训课程

(2)期望输出数据

http://www.atguigu.com

大数据培训课程

2.需求分析

大数据培训课程

3.案例实操

(1)编写FilterMapper类

package com.atguigu.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;   public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{     @Override   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         // 写出       context.write(value, NullWritable.get());   } }

(2)编写FilterReducer类

package com.atguigu.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;   public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {   Text k = new Text();     @Override   protected void reduce(Text key, Iterable<NullWritable> values, Context context)     throws IOException, InterruptedException {          // 1 获取一行       String line = key.toString();          // 2 拼接       line = line + “\r\n”;          // 3 设置key        k.set(line);          // 4 输出       context.write(k, NullWritable.get());   } }

(3)自定义一个OutputFormat类

package com.atguigu.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{     @Override   public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)      throws IOException, InterruptedException {         // 创建一个RecordWriter       return new FilterRecordWriter(job);   } }

(4)编写RecordWriter类

package com.atguigu.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext;   public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {     FSDataOutputStream atguiguOut = null;   FSDataOutputStream otherOut = null;     public FilterRecordWriter(TaskAttemptContext job) {         // 1 获取文件系统       FileSystem fs;         try {          fs = FileSystem.get(job.getConfiguration());            // 2 创建输出文件路径          Path atguiguPath = new Path(“e:/atguigu.log”);          Path otherPath = new Path(“e:/other.log”);            // 3 创建输出流          atguiguOut = fs.create(atguiguPath);          otherOut = fs.create(otherPath);       } catch (IOException e) {          e.printStackTrace();       }   }     @Override   public void write(Text key, NullWritable value) throws IOException, InterruptedException {         // 判断是否包含“atguigu”输出到不同文件       if (key.toString().contains(“atguigu”)) {          atguiguOut.write(key.toString().getBytes());       } else {          otherOut.write(key.toString().getBytes());       }   }     @Override   public void close(TaskAttemptContext context) throws IOException, InterruptedException {         // 关闭资源 IOUtils.closeStream(atguiguOut);       IOUtils.closeStream(otherOut);   } }

(5)编写FilterDriver类

package com.atguigu.mapreduce.outputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   public class FilterDriver {     public static void main(String[] args) throws Exception {   // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { “e:/input/inputoutputformat”, “e:/output2” };         Configuration conf = new Configuration();       Job job = Job.getInstance(conf);         job.setJarByClass(FilterDriver.class);       job.setMapperClass(FilterMapper.class);       job.setReducerClass(FilterReducer.class);         job.setMapOutputKeyClass(Text.class);       job.setMapOutputValueClass(NullWritable.class);             job.setOutputKeyClass(Text.class);       job.setOutputValueClass(NullWritable.class);         // 要将自定义的输出格式组件设置到job中       job.setOutputFormatClass(FilterOutputFormat.class);         FileInputFormat.setInputPaths(job, new Path(args[0]));         // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat       // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录       FileOutputFormat.setOutputPath(job, new Path(args[1]));         boolean result = job.waitForCompletion(true);       System.exit(result ? 0 : 1);   } }

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)