大数据培训 GroupingComparator分组(辅助排序)

 

对Reduce阶段的数据根据某一个或几个字段进行分组。

分组排序步骤:

(1)自定义类继承WritableComparator

(2)重写compare()方法

@Override

public int compare(WritableComparable a, WritableComparable b) {

      // 比较的业务逻辑

      return result;

}

(3)创建一个构造将比较对象的类传给父类

protected OrderGroupingComparator() {

      super(OrderBean.class, true);

}

3.3.10 GroupingComparator分组案例实操

1.需求

有如下订单数据

表4-2 订单数据

大数据培训

现在需要求出每一个订单中最贵的商品。

(1)输入数据

(2)期望输出数据

1       222.8

2       722.4

3       232.8

2.需求分析

(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。

(2)在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,如图4-18所示。

大数据培训

图4-18 过程分析

3.代码实现

(1)定义订单信息OrderBean类

package com.atguigu.mapreduce.order;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

 

public class OrderBean implements WritableComparable<OrderBean> {

 

  private int order_id; // 订单id号

  private double price; // 价格

 

  public OrderBean() {

      super();

  }

 

  public OrderBean(int order_id, double price) {

      super();

      this.order_id = order_id;

      this.price = price;

  }

 

  @Override

  public void write(DataOutput out) throws IOException {

      out.writeInt(order_id);

      out.writeDouble(price);

  }

 

  @Override

  public void readFields(DataInput in) throws IOException {

      order_id = in.readInt();

      price = in.readDouble();

  }

 

  @Override

  public String toString() {

      return order_id + “\t” + price;

  }

 

  public int getOrder_id() {

      return order_id;

  }

 

  public void setOrder_id(int order_id) {

      this.order_id = order_id;

  }

 

  public double getPrice() {

      return price;

  }

 

  public void setPrice(double price) {

      this.price = price;

  }

 

  // 二次排序

  @Override

  public int compareTo(OrderBean o) {

 

      int result;

 

      if (order_id > o.getOrder_id()) {

         result = 1;

      } else if (order_id < o.getOrder_id()) {

         result = -1;

      } else {

         // 价格倒序排序

         result = price > o.getPrice() ? -1 : 1;

      }

 

      return result;

  }

}

(2)编写OrderSortMapper类

package com.atguigu.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

 

  OrderBean k = new OrderBean();

 

  @Override

  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

      // 1 获取一行

      String line = value.toString();

 

      // 2 截取

      String[] fields = line.split(“\t”);

 

      // 3 封装对象

      k.setOrder_id(Integer.parseInt(fields[0]));

      k.setPrice(Double.parseDouble(fields[2]));

 

      // 4 写出

      context.write(k, NullWritable.get());

  }

}

(3)编写OrderSortGroupingComparator类

package com.atguigu.mapreduce.order;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

 

public class OrderGroupingComparator extends WritableComparator {

 

  protected OrderGroupingComparator() {

      super(OrderBean.class, true);

  }

 

  @Override

  public int compare(WritableComparable a, WritableComparable b) {

 

      OrderBean aBean = (OrderBean) a;

      OrderBean bBean = (OrderBean) b;

 

      int result;

      if (aBean.getOrder_id() > bBean.getOrder_id()) {

         result = 1;

  } else if (aBean.getOrder_id() < bBean.getOrder_id()) {

         result = -1;

      } else {

         result = 0;

      }

 

      return result;

  }

}

(4)编写OrderSortReducer类

package com.atguigu.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

 

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

 

  @Override

  protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)     throws IOException, InterruptedException {

 

      context.write(key, NullWritable.get());

  }

}

(5)编写OrderSortDriver类

package com.atguigu.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class OrderDriver {

 

  public static void main(String[] args) throws Exception, IOException {

 

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置

      args  = new String[]{“e:/input/inputorder” , “e:/output1”};

 

      // 1 获取配置信息

      Configuration conf = new Configuration();

      Job job = Job.getInstance(conf);

 

      // 2 设置jar包加载路径

      job.setJarByClass(OrderDriver.class);

 

      // 3 加载map/reduce类

      job.setMapperClass(OrderMapper.class);

      job.setReducerClass(OrderReducer.class);

 

      // 4 设置map输出数据key和value类型

      job.setMapOutputKeyClass(OrderBean.class);

      job.setMapOutputValueClass(NullWritable.class);

 

      // 5 设置最终输出数据的key和value类型

      job.setOutputKeyClass(OrderBean.class);

      job.setOutputValueClass(NullWritable.class);

 

      // 6 设置输入数据和输出数据路径

      FileInputFormat.setInputPaths(job, new Path(args[0]));

      FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

     // 8 设置reduce端的分组

  job.setGroupingComparatorClass(OrderGroupingComparator.class);

 

      // 7 提交

      boolean result = job.waitForCompletion(true);

      System.exit(result ? 0 : 1);

  }

}


上一篇:
下一篇:
关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
电话:010-56253825
邮箱:info@atguigu.com
地址:北京市昌平区宏福科技园综合楼6层(北京校区)

 深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦6层(上海校区)