前言
这一篇来介绍Spark3.0版本中Spark Sql新增的重要特性AQE
AQE全称Adaptive Query Execution,在3.0版本中主要包含以下三个功能
(1)Dynamically coalescing shuffle partitions
(2)Dynamically switching join strategies
(3)Dynamically optimizing skew joins
动态缩小shuffle分区数
在Spark中运行查询处理非常大的数据时,shuffle通常会对查询性能产生非常重要的影响。shuffle是非常昂贵的操作,因为它需要进行网络传输移动数据,以便下游进行计算。
最好的分区取决于数据,但是每个查询的阶段之间的数据大小可能相差很大,这使得该数值难以调整:
(1)如果分区太少,则每个分区的数据量可能会很大,处理这些数据量非常大的分区,可能需要将数据溢写到磁盘(例如,排序和聚合),降低了查询。
(2)如果分区太多,则每个分区的数据量大小可能很小,读取大量小的网络数据块,这也会导致I/O效率低而降低了查询速度。拥有大量的task(一个分区一个task)也会给Spark任务计划程序带来更多负担。
为了解决这个问题,我们可以在任务开始时先设置较多的shuffle分区个数,然后在运行时通过查看shuffle文件统计信息将相邻的小分区合并成更大的分区。
例如,假设正在运行select max(i) from tbl groupby j。输入tbl很小,在分组前只有2个分区。那么任务刚初始化时,我们将分区数设置为5,如果没有AQE,Spark将启动五个任务来进行最终聚合,但是其中会有三个非常小的分区,为每个分区启动单独的任务这样就很浪费。
取而代之的是,AQE将这三个小分区合并为一个,因此最终只需三个task而不是五个
(1)编写代码进行对比,先不开启aqe的功能,正常去做一个表与表的join功能,先把广播join关闭。
package com.atguigu.sparksqltuning
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object AqeTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")//为了测试动态缩小分区,关闭广播join
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
useJoin(sparkSession)
}
def useJoin(sparkSession: SparkSession) = {
val saleCourse = sparkSession.sql("select *from dwd.dwd_sale_course")
val coursePay = sparkSession.sql("select * from dwd.dwd_course_pay")
.withColumnRenamed("discount", "pay_discount")
.withColumnRenamed("createtime", "pay_createtime")
val courseShoppingCart = sparkSession.sql("select *from dwd.dwd_course_shopping_cart")
.drop("coursename")
.withColumnRenamed("discount", "cart_discount")
.withColumnRenamed("createtime", "cart_createtime")
saleCourse.join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
.join(coursePay, Seq("orderid", "dt", "dn"), "left")
.select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
, "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
"cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
.write.mode(SaveMode.Overwrite).insertInto("dws.dws_salecourse_detail_1")
}
}
(2)打包,运行yarn任务,查看spark ui。join产生shuffle,分区数为200.
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g --queue spark --class com.atguigu.sparksqltuning.AqeTest spark-sql-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
(3)使用Spark3.0动态缩小分区的新增参数:
1.spark.sql.adaptive.enabled:Spark3.0 AQE开关,默认是false,如果要使用AQE功能,得先设置为true。
2.spark.sql.adaptive.coalescePartitions.enabled:动态缩小分区参数,默认值是true,但是得先保证spark.sql.adaptive.enabled为true。
3.spark.sql.adaptive.coalescePartitions.initialPartitionNum:任务刚启动时的初始分区,此参数可以设置了大点,默认值与spark.sql.shuffle.partition一样为200。
4.spark.sql.adaptive.coalescePartitions.minPartitionNum:进行动态缩小分区,最小缩小至多少分区,最终分区数不会小于此参数。
5.spark.sql.adaptive.advisoryPartitionSizeInBytes:缩小分区或进行拆分分区操作后所期望的每个分区的大小(数据量)。
(4)设置完参数后,再次提交Spark任务,查看相应Stage。可以看到两个join阶段的shuffle分区个数分别被缩小至了16和56
(5)点击第一个join的stage,也就是stage id为5的stage,查看task详情,可以看到多个分区合并后数据量都大致到了参数所设置的期望值20mb。
(6)查看task的运行时间情况
(7)分区虽然得到了优化,但是资源的使用不一定合理利用,这个时候如果yarn资源充足可以使用Spark的动态调度资源的参数进行再次优化。
1.spark.dynamicAllocation.enabled:动态调度资源,Spark会根据任务工作的负载情况,进行动态调整应用程序的资源占用。Apache中默认关闭,CDH中默认开启。此参数为Spark1.2版本的参数。
2.spark.shuffle.service.enabled(与spark.dynamicAllocation.enabled一起使用,3.0后替换成了spark.dynamicAllocation.shuffleTracking.enabled)
(8)不改变Spark-submit命令,再次提交,查看Spark task详情。这个时候就会动态去调度申请executor资源来执行当前任务。
动态选择join策略
Spark支持多种join策略,其中如果join的一张表可以很好地插入内存,那么broadcast shah join通常性能最高。因此,Spark join中,如果小表小于广播大小阀值(默认10mb),Spark将计划进行broadcast hash join。但是,很多事情都会使这种大小估计出错(例如,存在选择性很高的过滤器),或者join关系是一系列的运算符而不是简单的扫描表操作。
为了解决此问题,AQE现在根据最准确的join大小运行时重新计划join策略。从下图实例中可以看出,发现连接的右侧表比左侧表小的多,并且足够小可以进行广播,那么AQE会重新优化,将sort merge join转换成为broadcast hash join
(1)编写两表join代码,保证两张表的数据量都是大于10mb不会自动触发广播join,并且其中一表加上过滤条件并保证经过过滤后的数据小于10mb,先不开启AQE功能进行测试
package com.atguigu.sparksqltuning
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object AqeTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
switchJoinStartegies(sparkSession)
}
def switchJoinStartegies(sparkSession: SparkSession) = {
// val saleCourse = sparkSession.sql("select *from dwd.dwd_sale_course")
val coursePay = sparkSession.sql("select * from dwd.dwd_course_pay")
.withColumnRenamed("discount", "pay_discount")
.withColumnRenamed("createtime", "pay_createtime")
.where("orderid between 'odid-9999000' and 'odid-9999999'")
val courseShoppingCart = sparkSession.sql("select *from dwd.dwd_course_shopping_cart")
.drop("coursename")
.withColumnRenamed("discount", "cart_discount")
.withColumnRenamed("createtime", "cart_createtime")
val tmpdata = coursePay.join(courseShoppingCart, Seq("orderid"), "right")
tmpdata.show()
}
}
(2)提交任务,查看执行计划图,可以看到其中一张表经过filter过滤后,数据量已经非常小了,但是走的join仍然是sort merge join。
(3)使用AQE动态选择join策略参数:
1.spark.sql.adaptive.enabled:AQE开关,默认关闭
2.spark.sql.adaptive.localShuffleReader.enabled:本地shuffle读取器开关,默认为true,需要先将spark.sql.adaptive.enabled设置为true,开启后在shuffle阶段尝试使用本地shuffle读取器进行读取数据,将可以优化的sql从sort merge join转换成broadcast hash join。
(4)代码不做任何修改,开启参数,再次提交任务,查看执行计划图。首先stage会多出一步广播操作
(5)并且在表过滤后,join前会尝试使用本地shuffle读取器进行读取数据,如果满足广播join条件则将表进行广播,触发广播join。
动态优化倾斜任务
当数据在群集中的分区之间分布不均匀时,就会发生数据倾斜。严重的倾斜会大大降低查询性能,尤其对于join。AQE skew join优化会从随机shuffle文件统计信息自动检测到这种倾斜。然后它将倾斜分区拆分成较小的子分区。
例如,下图 A join B,A表中分区A0明显大于其他分区
因此,3.0AQE功能会将A0分区拆分成两个子分区,并且对应连接B0分区
(1)还是拿之前的任务来演示,倾斜join如下图所示,task耗时非常久,数据量非常多
(2)使用AQE功能进行优化,相关参数如下:
1.spark.sql.adaptive.enabled :开启aqe功能,默认关闭
2.spark.sql.adaptive.skewJoin.enabled:开启aqe倾斜join,需要先将spark.sql.adaptive.enabled设置为true。
3.spark.sql.adaptive.skewJoin.skewedPartitionFactor :倾斜因子,如果分区的数据量大于 此因子乘以分区的中位数,并且也大于
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,那么认为是数据倾斜的,默认值为5
4.spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:每个分区的阀值,默认256mb,此参数应该大于spark.sql.adaptive.advisoryPartitionSizeInBytes
5.spark.sql.adaptive.advisoryPartitionSizeInBytes:缩小分区或进行拆分分区操作后所期望的每个分区的大小(数据量)。
(3)设置相应参数进行join,join任务为上图演示的第一个任务,关闭广播join和动态缩小分区进行测试。从上图得知倾斜的task数据量为28.8mb和27.5mb。中位分区数为1.2mb。
(4)所以上图参数设置5*1.2mb<28.8mb和27.5mb,并且也大于
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
所设置的20mb,满足AQE的优化倾斜task的条件。提交yarn任务再次查看stage
(5)可以看到第一个倾斜join的stage的分区个数,由默认的200变成206,原因就是将那2个倾斜的task任务拆分成了6个任务。点击stage查看task详情。
(6)可以看到数据倾斜已被解决,并且拆分后的任务数据量都大致为参数所设置的期望值8mb左右
(7)接下来,结合动态缩小分区功能一起使用,在使用之前,这里会有个问题到底是先合并分区再拆分,还是先拆分再合并。开启动态缩小分区参数。再次运行任务
(8)查看stage和task详情,分区得到合并,点击stage,查看task详情
(9)可以看到倾斜的task没有被优化。原因是什么呢?因为这个时候,中位分区数的数据量发生变化了,变成7.1mb,这个时候7.1mb*倾斜因子5是大于两个倾斜task任务数据量的,所以Spark任务当前任务并没有发生数据倾斜。所以当动态合并缩小分区和动态优化倾斜task两个功能都开启时是先进行合并分区再进行判断倾斜task是否需要拆分的。
(10)将倾斜task修改为2,使7.1mb*2之后小于两个倾斜任务的数据量,再次运行任务。
(11)可以看到Spark此时认为数据倾斜了,join的分区增多了,倾斜的task已被优化拆分。
结论
Spark3.0 AQE功能非常强大,可以动态合并分区、更加精准地选择join策略,自动优化倾斜的join任务。三个功能的默认开关都为true,只需开启aqe参数spark.sql.adaptive.enabled 就可以使用。
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。
上一篇: 用户自定义函数UDF_大数据培训
下一篇: 用户自定义函数UDAF_大数据培训