玩转Spark Sql优化之解决数据倾斜_大数据培训

发布时间:2021年08月06日作者:atguigu浏览次数:1,042

前言

承接上文,本文演示数据倾斜场景。

仍然是第一篇所提的三张表分表对应课程表、购物车表、支付表,三张表测试数据量分别为课程表3MB,购物车表4.3G,支付表2.3G。

玩转Spark Sql优化之解决数据倾斜(四)

 

数据倾斜现象

根据此业务,为三表join,即课程表join购物车表再join支付表,造数据时,故意将购物车表数据的courseid(课程id)101和103数据各造了500万条,join时产生数据倾斜。通过spark ui查看数据倾斜场景,先将广播join关闭。

玩转Spark Sql优化之解决数据倾斜(四)

 

打成jar包,提交yarn运行任务。数据倾斜是在第一个join,小表join大表,所以我们查看第一个产生shuffle并且分区是36的stage。

玩转Spark Sql优化之解决数据倾斜(四)

 

查看task运行详情,可以看到两个耗时非常久的task

玩转Spark Sql优化之解决数据倾斜(四)

 

查看task详情,点击duration,task根据耗时排序

玩转Spark Sql优化之解决数据倾斜(四)

 

玩转Spark Sql优化之解决数据倾斜(四)

 

可以看到两个数据量都为500万条左右的task,耗时非常久。

解决方案广播join

玩转Spark Sql优化之解决数据倾斜(四)

 

Spark join策略中,如果当一张小表足够小并且可以先缓存到内存中,那么可以使用Broadcast Hash Join,其原理就是先将小表聚合到driver端,再广播到各个executor中,那么再次进行join的时候,就相当于大表的各自分区的数据与小表进行本地join,从而规避了shuffle。

广播join默认值为10MB,


spark.sql.autoBroadcastJoinThreshold参数控制。即当表的数据量小于等于10MB时自动触发广播join。

玩转Spark Sql优化之解决数据倾斜(四)

 

刚才的历史任务中,


spark.sql.autoBroadcastJoinThreshold设置为了-1,禁用了广播join,观察执行计划图

玩转Spark Sql优化之解决数据倾斜(四)

 

玩转Spark Sql优化之解决数据倾斜(四)

 

玩转Spark Sql优化之解决数据倾斜(四)

 

两张表都走了SortMerge Join。那么根据表数据量可以看出大小表,针对小表join大表可以选择广播join进行优化。

让表触发的广播join的方式有三种,分别是参数控制,api控制,hint暗示。

通过参数控制:

玩转Spark Sql优化之解决数据倾斜(四)

 

通过api控制:

玩转Spark Sql优化之解决数据倾斜(四)

 

hint暗示:

sparkSession.sql("select /*+ BROADCAST(school) */ *  from test1 left join test2 on test1.id=test2.id").show
sparkSession.sql("select /*+ BROADCASTJOIN(school) */ *  from test1 left join test2 on test1.id=test2.id").show
sparkSession.sql("select /*+ MAPJOIN(school) */ *  from test1 left join test2 on test1.id=test2.id").show

触发广播join后对应执行计划图也随之改变,产生shuffle的stage也会减少一个

玩转Spark Sql优化之解决数据倾斜(四)

 

玩转Spark Sql优化之解决数据倾斜(四)

 

spark sql中的广播join可以直接规避shuffle阶段,来优化掉数据倾斜的问题

打散大表扩容小表

此方案是先将大表打散比如,现在业务中courseid是101和103两个值过多产生了数据倾斜,那么这个时候可以将大表的数据针对courseid进行打散操作,加上随机值,比如在courseid前加上0-9的随机值打散10分形成0_101,1_101,…,9_101。

打散之后为了让大表和小表join上,那么小表需进行扩容操作和大表对应的随机key能匹配上,那么小表就需要进行扩容操作。小表当中比如存在1条101数据,那么这个时候小表数据量得扩大10倍,并且加上前缀让key变成0_101,1_101,…,9_101。这样能与大表数据join上。

以下为代码处理,这里的实现思路

1.打散大表:实际就是数据一进一出进行处理,对courseid前拼上随机前缀实现打散

2.扩容小表:实际就是将DataFrame中每一条数据,转成一个集合,并往这个集合里循环添加10条数据,最后使用flatmap压平此集合,达到扩容的效果.

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SaveMode, SparkSession}


import scala.collection.mutable.ArrayBuffer
import scala.util.Random


object PartitionTuning {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test")
      .set("spark.sql.shuffle.partitions", "36")
      .set("spark.sql.autoBroadcastJoinThreshold", "-1")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    testJoin2(sparkSession)
  }
/**
    * 打散大表  扩容小表 解决数据倾斜
    *
    * @param sparkSession
    */
  def testJoin2(sparkSession: SparkSession): Unit = {
    import sparkSession.implicits._
    val saleCourse = sparkSession.sql("select *from dwd.dwd_sale_course")
    val coursePay = sparkSession.sql("select * from dwd.dwd_course_pay")
      .withColumnRenamed("discount", "pay_discount")
      .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select *from dwd.dwd_course_shopping_cart")
      .withColumnRenamed("discount", "cart_discount")
      .withColumnRenamed("createtime", "cart_createtime")
    //将大表打散  打散10份
    val newCourseShoppingCart = courseShoppingCart.mapPartitions((partitions: Iterator[Row]) => {
      partitions.map(item => {
        val courseid = item.getAs[Int]("courseid")
        val randInt = Random.nextInt(10)
        DwdCourseShoppingCart(courseid, item.getAs[String]("orderid"),
          item.getAs[String]("coursename"), item.getAs[java.math.BigDecimal]("cart_discount"),
          item.getAs[java.math.BigDecimal]("sellmoney"), item.getAs[java.sql.Timestamp]("cart_createtime"),
          item.getAs[String]("dt"), item.getAs[String]("dn"), randInt + "_" + courseid)
      })
    })
    //小表进行扩容 扩大10倍
    val newSaleCourse = saleCourse.flatMap(item => {
      val list = new ArrayBuffer[DwdSaleCourse]()
      val courseid = item.getAs[Int]("courseid")
      val coursename = item.getAs[String]("coursename")
      val status = item.getAs[String]("status")
      val pointlistid = item.getAs[Int]("pointlistid")
      val majorid = item.getAs[Int]("majorid")
      val chapterid = item.getAs[Int]("chapterid")
      val chaptername = item.getAs[String]("chaptername")
      val edusubjectid = item.getAs[Int]("edusubjectid")
      val edusubjectname = item.getAs[String]("edusubjectname")
      val teacherid = item.getAs[Int]("teacherid")
      val teachername = item.getAs[String]("teachername")
      val coursemanager = item.getAs[String]("coursemanager")
      val money = item.getAs[java.math.BigDecimal]("money")
      val dt = item.getAs[String]("dt")
      val dn = item.getAs[String]("dn")
      for (i <- 0 until 10) {
        list.append(DwdSaleCourse(courseid, coursename, status, pointlistid, majorid, chapterid, chaptername, edusubjectid,
          edusubjectname, teacherid, teachername, coursemanager, money, dt, dn,i+ "_" + courseid))
      }
      list
    })
newSaleCourse.join(newCourseShoppingCart.drop("courseid").drop("coursename"),
  Seq("rand_courseid", "dt", "dn"), "right")
  .join(coursePay, Seq("orderid", "dt", "dn"), "left")
  .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
  .write.mode(SaveMode.Overwrite).insertInto("dws.dws_salecourse_detail")


  }


  case class DwdCourseShoppingCart(courseid: Int,
                                   orderid: String,
                                   coursename: String,
                                   cart_discount: java.math.BigDecimal,
                                   sellmoney: java.math.BigDecimal,
                                   cart_createtime: java.sql.Timestamp,
                                   dt: String,
                                   dn: String,
                                   rand_courseid: String)


  case class DwdSaleCourse(courseid: Int,
                           coursename: String,
                           status: String,
                           pointlistid: Int,
                           majorid: Int,
                           chapterid: Int,
                           chaptername: String,
                           edusubjectid: Int,
                           edusubjectname: String,
                           teacherid: Int,
                           teachername: String,
                           coursemanager: String,
                           money: java.math.BigDecimal,
                           dt: String,
                           dn: String,
                           rand_courseid: String)


}

打开jar包,提交yarn任务,查看spark ui图。观察task详情

玩转Spark Sql优化之解决数据倾斜(四)

 

结论

在Spark Sql当中join时产生了数据倾斜,可以采用广播join和打散大表扩容小表两种方案解决,广播join可以直接规避shuffle阶段,广播join默认值为10mb,可以通过参数适当调大,也可以通过API和Hint触发。

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园(成都校区)