玩转Spark Sql优化之SMB Join_大数据培训

发布时间:2021年08月16日作者:atguigu浏览次数:833

前言

承接Spark Sql优化方案上文,上篇介绍了Spark Sql当中小表join大表可以使用广播join优化,本篇就介绍大表join大表的优化。

还是这三张表,这次演示购物车表和支付表的join,两张表的测试数据大小为4.7G和2.3G。

玩转Spark Sql优化之SMB Join(五)

 

三表join

三张表先正常进行join,先让两张大表join,再与课程表小表join。

package com.atguigu.sparksqltuning


import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.{SaveMode, SparkSession}
object SMBJoinTuning {


  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test")
      .set("spark.sql.shuffle.partitions", "36")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
     useJoin(sparkSession)
  }
  def useJoin(sparkSession: SparkSession) = {
    //查询出三张表 并进行join 插入到最终表中
    val saleCourse = sparkSession.sql("select *from dwd.dwd_sale_course")
    val coursePay = sparkSession.sql("select * from dwd.dwd_course_pay")
      .withColumnRenamed("discount", "pay_discount")
      .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select *from dwd.dwd_course_shopping_cart")
      .drop("coursename")
      .withColumnRenamed("discount", "cart_discount")
      .withColumnRenamed("createtime", "cart_createtime")
      //大表先与大表join
    val tmpdata = courseShoppingCart.join(coursePay, Seq("orderid"), "left")
      //再与小表join,并且走广播join
    val result = broadcast(saleCourse).join(tmpdata, Seq("courseid"), "right")
    result.select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
      , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
      "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dwd.dwd_sale_course.dt", "dwd.dwd_sale_course.dn")
      .write.mode(SaveMode.Overwrite).insertInto("dws.dws_salecourse_detail_1")
  }

编写代码后,打成jar包运行yarn任务,查看对应执行计划图。

玩转Spark Sql优化之SMB Join(五)

 

可以看到两张大表join前的排序时间分别为31.1秒和19秒,那么针对这个排序时间就可以进行优化。

玩转Spark Sql优化之SMB Join(五)

 

SMB JOIN

SMB JOIN用于大表join大表的优化。

SMB JOIN是sort merge bucket操作,需要进行分桶,首先会进行排序,然后根据key值合并,把相同key的数据放到同一个bucket中(按照key进行hash)。分桶的目的其实就是把大表化成“小表”(多个桶)。相同key的数据都在同一个桶中之后,再进行join操作,那么在联合的时候就会大幅度地减小无关项的扫描。

使用条件:

(1)两表进行分桶,桶的个数必须相等

(2)两边进行join时,join列==排序列==分桶列

hive当中也有此功能需要开启相应参数

玩转Spark Sql优化之SMB Join(五)

 

那么接下来就先用Spark对表进行分桶

(1)分桶操作,并且保证两张表的桶的个数都相等,分桶列和排序列都是join列

  def useBucket(sparkSession: SparkSession) = {
    sparkSession.read.json("/user/atguigu/ods/coursepay.log")
      .write.partitionBy("dt", "dn")
      .format("parquet")
      .bucketBy(10, "orderid")
      .sortBy("orderid").mode(SaveMode.Overwrite)
      .saveAsTable("dwd.dwd_course_pay_cluster")
    sparkSession.read.json("/user/atguigu/ods/courseshoppingcart.log")
      .write.partitionBy("dt", "dn")
      .bucketBy(10, "orderid")
      .format("parquet")
      .sortBy("orderid").mode(SaveMode.Overwrite)
      .saveAsTable("dwd.dwd_course_shopping_cart_cluster")
  }

(2)执行分桶任务

玩转Spark Sql优化之SMB Join(五)

 

(3)分完桶之后,Spark Sql查询分桶表,进行join。步骤不变,只是查询的表发生了变化,查询分桶表。

def useSMBJoin(sparkSession: SparkSession) = {
    //查询出三张表 并进行join 插入到最终表中
    val saleCourse = sparkSession.sql("select *from dwd.dwd_sale_course")
    val coursePay = sparkSession.sql("select * from dwd.dwd_course_pay_cluster")
      .withColumnRenamed("discount", "pay_discount")
      .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select *from dwd.dwd_course_shopping_cart_cluster")
      .drop("coursename")
      .withColumnRenamed("discount", "cart_discount")
      .withColumnRenamed("createtime", "cart_createtime")
      //两张分桶表进行 join
    val tmpdata = courseShoppingCart.join(coursePay, Seq("orderid"), "left")
      //再与小表进行 join
    val result = broadcast(saleCourse).join(tmpdata, Seq("courseid"), "right")
    result.select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
      , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
      "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dwd.dwd_sale_course.dt", "dwd.dwd_sale_course.dn")
      .write.mode(SaveMode.Overwrite).saveAsTable("dws.dws_salecourse_detail_2")


  }

(4)提交任务查看,查看stage,可以看到分完桶之后,再进行Spark的shuffle操作,task也会发生变化,分区个数与桶个数相等。

玩转Spark Sql优化之SMB Join(五)

 

(5)查看执行计划图

玩转Spark Sql优化之SMB Join(五)

 

可以看到排序时间,分别缩减为了7.1秒和9.6秒

(测试数据不是特别多,不是特别明显)。

玩转Spark Sql优化之SMB Join(五)

 

结论

在表与表进行join时,如果两张表都是非常大的数据量,那么可以考虑使用分桶进行join。SMB Join时得保证两表桶数量相等,join列等于排序列等于分桶列。

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)