尚硅谷大数据技术之Hadoop(MapReduce)(新)第3章 MapReduce框架原理
3.3.10 GroupingComparator分组案例实操
1.需求
有如下订单数据
表4-2 订单数据
订单id |
商品id |
成交金额 |
0000001 |
Pdt_01 |
222.8 |
Pdt_02 |
33.8 |
|
0000002 |
Pdt_03 |
522.8 |
Pdt_04 |
122.4 |
|
Pdt_05 |
722.4 |
|
0000003 |
Pdt_06 |
232.8 |
Pdt_02 |
33.8 |
现在需要求出每一个订单中最贵的商品。
(1)输入数据
0000001 Pdt_01 222.8
0000002 Pdt_05 722.4
0000001 Pdt_02 33.8
0000003 Pdt_06 232.8
0000003 Pdt_02 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4
(2)期望输出数据
1 222.8
2 722.4
3 232.8
2.需求分析
(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。
(2)在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,如图4-18所示。
图4-18 过程分析
3.代码实现
(1)定义订单信息OrderBean类
package com.atguigu.mapreduce.order; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable;
public class OrderBean implements WritableComparable<OrderBean> {
private int order_id; // 订单id号 private double price; // 价格
public OrderBean() { super(); }
public OrderBean(int order_id, double price) { super(); this.order_id = order_id; this.price = price; }
@Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeDouble(price); }
@Override public void readFields(DataInput in) throws IOException { order_id = in.readInt(); price = in.readDouble(); }
@Override public String toString() { return order_id + "\t" + price; }
public int getOrder_id() { return order_id; }
public void setOrder_id(int order_id) { this.order_id = order_id; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
// 二次排序 @Override public int compareTo(OrderBean o) {
int result;
if (order_id > o.getOrder_id()) { result = 1; } else if (order_id < o.getOrder_id()) { result = -1; } else { // 价格倒序排序 result = price > o.getPrice() ? -1 : 1; }
return result; } } |
(2)编写OrderSortMapper类
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 封装对象 k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); // 4 写出 context.write(k, NullWritable.get()); } } |
(3)编写OrderSortGroupingComparator类
package com.atguigu.mapreduce.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator;
public class OrderGroupingComparator extends WritableComparator {
protected OrderGroupingComparator() { super(OrderBean.class, true); }
@Override public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b;
int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id() < bBean.getOrder_id()) { result = -1; } else { result = 0; }
return result; } } |
(4)编写OrderSortReducer类
package com.atguigu.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer;
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } |