大数据培训课程Reduce Join案例实操

发布时间:2020年09月17日作者:atguigu浏览次数:1,000

Reduce Join案例实操

1.需求

表4-4 订单数据表t_order

idpidamount
1001011
1002022
1003033
1004014
1005025
1006036

表4-5 商品信息表t_product

pidpname
01小米
02华为
03格力

       将商品信息表中数据根据商品pid合并到订单数据表中。

表4-6 最终数据形式

idpnameamount
1001小米1
1004小米4
1002华为2
1005华为5
1003格力3
1006格力6

2.需求分析

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联,如图4-20所示。

大数据培训

图4-20 Reduce端表合并

3.代码实现

1)创建商品和订合并后的Bean类

package com.atguigu.mapreduce.table; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable;   public class TableBean implements Writable {      private String order_id; // 订单id    private String p_id;      // 产品id    private int amount;       // 产品数量    private String pname;     // 产品名称    private String flag;      // 表的标记      public TableBean() {       super();    }      public TableBean(String order_id, String p_id, int amount, String pname, String flag) {         super();         this.order_id = order_id;       this.p_id = p_id;       this.amount = amount;       this.pname = pname;       this.flag = flag;    }      public String getFlag() {       return flag;    }      public void setFlag(String flag) {       this.flag = flag;    }      public String getOrder_id() {       return order_id;    }      public void setOrder_id(String order_id) {       this.order_id = order_id;    }      public String getP_id() {       return p_id;    }      public void setP_id(String p_id) {       this.p_id = p_id;    }      public int getAmount() {       return amount;    }      public void setAmount(int amount) {       this.amount = amount;    }      public String getPname() {       return pname;    }      public void setPname(String pname) {       this.pname = pname;    }      @Override    public void write(DataOutput out) throws IOException {       out.writeUTF(order_id);       out.writeUTF(p_id);       out.writeInt(amount);       out.writeUTF(pname);       out.writeUTF(flag);    }      @Override    public void readFields(DataInput in) throws IOException {       this.order_id = in.readUTF();       this.p_id = in.readUTF();       this.amount = in.readInt();       this.pname = in.readUTF();       this.flag = in.readUTF();    }      @Override    public String toString() {       return order_id + “\t” + pname + “\t” + amount + “\t” ;    } }

2)编写TableMapper类

package com.atguigu.mapreduce.table; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;   public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{   String name;    TableBean bean = new TableBean();    Text k = new Text();       @Override    protected void setup(Context context) throws IOException, InterruptedException {         // 1 获取输入文件切片       FileSplit split = (FileSplit) context.getInputSplit();         // 2 获取输入文件名称       name = split.getPath().getName();    }      @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             // 1 获取输入数据       String line = value.toString();             // 2 不同文件分别处理       if (name.startsWith(“order”)) {// 订单表处理             // 2.1 切割           String[] fields = line.split(“\t”);                     // 2.2 封装bean对象           bean.setOrder_id(fields[0]);           bean.setP_id(fields[1]);           bean.setAmount(Integer.parseInt(fields[2]));           bean.setPname(“”);           bean.setFlag(“order”);                     k.set(fields[1]);       }else {// 产品表处理             // 2.3 切割           String[] fields = line.split(“\t”);                     // 2.4 封装bean对象           bean.setP_id(fields[0]);           bean.setPname(fields[1]);           bean.setFlag(“pd”);           bean.setAmount(0);           bean.setOrder_id(“”);                     k.set(fields[0]);       }         // 3 写出       context.write(k, bean);    } }

3)编写TableReducer类

package com.atguigu.mapreduce.table; import java.io.IOException; import java.util.ArrayList; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;   public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {      @Override    protected void reduce(Text key, Iterable<TableBean> values, Context context)   throws IOException, InterruptedException {         // 1准备存储订单的集合       ArrayList<TableBean> orderBeans = new ArrayList<>();       // 2 准备bean对象       TableBean pdBean = new TableBean();         for (TableBean bean : values) {             if (“order”.equals(bean.getFlag())) {// 订单表                // 拷贝传递过来的每条订单数据到集合中              TableBean orderBean = new TableBean();                try {                 BeanUtils.copyProperties(orderBean, bean);              } catch (Exception e) {                 e.printStackTrace();              }                orderBeans.add(orderBean);           } else {// 产品表                try {                 // 拷贝传递过来的产品表到内存中                 BeanUtils.copyProperties(pdBean, bean);              } catch (Exception e) {                 e.printStackTrace();              }           }       }         // 3 表的拼接       for(TableBean bean:orderBeans){             bean.setPname (pdBean.getPname());                     // 4 数据写出去           context.write(bean, NullWritable.get());       }    } }

4)编写TableDriver类

package com.atguigu.mapreduce.table; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   public class TableDriver {      public static void main(String[] args) throws Exception {       // 0 根据自己电脑路径重新配置 args = new String[]{“e:/input/inputtable”,”e:/output1″};   // 1 获取配置信息,或者job对象实例       Configuration configuration = new Configuration();       Job job = Job.getInstance(configuration);         // 2 指定本程序的jar包所在的本地路径       job.setJarByClass(TableDriver.class);         // 3 指定本业务job要使用的Mapper/Reducer业务类       job.setMapperClass(TableMapper.class);       job.setReducerClass(TableReducer.class);         // 4 指定Mapper输出数据的kv类型       job.setMapOutputKeyClass(Text.class);       job.setMapOutputValueClass(TableBean.class);         // 5 指定最终输出的数据的kv类型       job.setOutputKeyClass(TableBean.class);       job.setOutputValueClass(NullWritable.class);         // 6 指定job的输入原始文件所在目录       FileInputFormat.setInputPaths(job, new Path(args[0]));       FileOutputFormat.setOutputPath(job, new Path(args[1]));         // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行       boolean result = job.waitForCompletion(true);       System.exit(result ? 0 : 1);    } }

4.测试

运行程序查看结果

1001   小米   1  1001   小米   1  1002   华为   2  1002   华为   2  1003   格力   3  1003   格力   3    

5.总结

大数据培训

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)