大数据培训之自定义InputFormat

自定义InputFormat

大数据培训

自定义InputFormat案例实操

无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。

1.需求

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

(1)输入数据

         大数据培训 

(2)期望输出文件格式

大数据培训

2.需求分析

大数据培训

3.程序实现

(1)自定义InputFromat

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

 

// 定义类继承FileInputFormat

public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{

 

  @Override

  protected boolean isSplitable(JobContext context, Path filename) {

      return false;

  }

 

  @Override

  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

 

      WholeRecordReader recordReader = new WholeRecordReader();

      recordReader.initialize(split, context);

 

      return recordReader;

  }

}

(2)自定义RecordReader类

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class WholeRecordReader extends RecordReader<Text, BytesWritable>{

 

  private Configuration configuration;

  private FileSplit split;

 

  private boolean isProgress= true;

  private BytesWritable value = new BytesWritable();

  private Text k = new Text();

 

  @Override

  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

 

      this.split = (FileSplit)split;

      configuration = context.getConfiguration();

  }

 

  @Override

  public boolean nextKeyValue() throws IOException, InterruptedException {

 

      if (isProgress) {

 

         // 1 定义缓存区

         byte[] contents = new byte[(int)split.getLength()];

 

         FileSystem fs = null;

         FSDataInputStream fis = null;

 

         try {

            // 2 获取文件系统

            Path path = split.getPath();

            fs = path.getFileSystem(configuration);

 

            // 3 读取数据

            fis = fs.open(path);

 

            // 4 读取文件内容

            IOUtils.readFully(fis, contents, 0, contents.length);

 

            // 5 输出文件内容

            value.set(contents, 0, contents.length);

 

// 6 获取文件路径及名称

String name = split.getPath().toString();

 

// 7 设置输出的key值

k.set(name);

 

         } catch (Exception e) {

 

         }finally {

            IOUtils.closeStream(fis);

         }

 

         isProgress = false;

 

         return true;

      }

 

      return false;

  }

 

  @Override

  public Text getCurrentKey() throws IOException, InterruptedException {

      return k;

  }

 

  @Override

  public BytesWritable getCurrentValue() throws IOException, InterruptedException {

      return value;

  }

 

  @Override

  public float getProgress() throws IOException, InterruptedException {

      return 0;

  }

 

  @Override

  public void close() throws IOException {

  }

}

(3)编写SequenceFileMapper类处理流程

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{

 

  @Override

  protected void map(Text key, BytesWritable value,      Context context)    throws IOException, InterruptedException {

 

      context.write(key, value);

  }

}

(4)编写SequenceFileReducer类处理流程

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

 

  @Override

  protected void reduce(Text key, Iterable<BytesWritable> values, Context context)     throws IOException, InterruptedException {

 

      context.write(key, values.iterator().next());

  }

}

(5)编写SequenceFileDriver类处理流程

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

 

public class SequenceFileDriver {

 

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

 

       // 输入输出路径需要根据自己电脑上实际的输入输出路径设置

      args = new String[] { “e:/input/inputinputformat”, “e:/output1” };

 

       // 1 获取job对象

      Configuration conf = new Configuration();

      Job job = Job.getInstance(conf);

 

       // 2 设置jar包存储位置、关联自定义的mapper和reducer

      job.setJarByClass(SequenceFileDriver.class);

      job.setMapperClass(SequenceFileMapper.class);

      job.setReducerClass(SequenceFileReducer.class);

 

       // 7设置输入的inputFormat

      job.setInputFormatClass(WholeFileInputformat.class);

 

       // 8设置输出的outputFormat

 job.setOutputFormatClass(SequenceFileOutputFormat.class);

 

// 3 设置map输出端的kv类型

      job.setMapOutputKeyClass(Text.class);

      job.setMapOutputValueClass(BytesWritable.class);

 

       // 4 设置最终输出端的kv类型

      job.setOutputKeyClass(Text.class);

      job.setOutputValueClass(BytesWritable.class);

 

       // 5 设置输入输出路径

      FileInputFormat.setInputPaths(job, new Path(args[0]));

      FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

       // 6 提交job

      boolean result = job.waitForCompletion(true);

      System.exit(result ? 0 : 1);

  }

}


上一篇:
下一篇:
关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
电话:010-56253825
邮箱:info@atguigu.com
地址:北京市昌平区宏福科技园综合楼6层(北京校区)

 深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦6层(上海校区)