前言
承接Spark Sql优化方案上文,上篇介绍了Spark Sql当中小表join大表可以使用广播join优化,本篇就介绍大表join大表的优化。
还是这三张表,这次演示购物车表和支付表的join,两张表的测试数据大小为4.7G和2.3G。
三表join
三张表先正常进行join,先让两张大表join,再与课程表小表join。
package com.atguigu.sparksqltuning
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.{SaveMode, SparkSession}
object SMBJoinTuning {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("test")
.set("spark.sql.shuffle.partitions", "36")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
useJoin(sparkSession)
}
def useJoin(sparkSession: SparkSession) = {
//查询出三张表 并进行join 插入到最终表中
val saleCourse = sparkSession.sql("select *from dwd.dwd_sale_course")
val coursePay = sparkSession.sql("select * from dwd.dwd_course_pay")
.withColumnRenamed("discount", "pay_discount")
.withColumnRenamed("createtime", "pay_createtime")
val courseShoppingCart = sparkSession.sql("select *from dwd.dwd_course_shopping_cart")
.drop("coursename")
.withColumnRenamed("discount", "cart_discount")
.withColumnRenamed("createtime", "cart_createtime")
//大表先与大表join
val tmpdata = courseShoppingCart.join(coursePay, Seq("orderid"), "left")
//再与小表join,并且走广播join
val result = broadcast(saleCourse).join(tmpdata, Seq("courseid"), "right")
result.select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
, "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
"cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dwd.dwd_sale_course.dt", "dwd.dwd_sale_course.dn")
.write.mode(SaveMode.Overwrite).insertInto("dws.dws_salecourse_detail_1")
}
编写代码后,打成jar包运行yarn任务,查看对应执行计划图。
可以看到两张大表join前的排序时间分别为31.1秒和19秒,那么针对这个排序时间就可以进行优化。
SMB JOIN
SMB JOIN用于大表join大表的优化。
SMB JOIN是sort merge bucket操作,需要进行分桶,首先会进行排序,然后根据key值合并,把相同key的数据放到同一个bucket中(按照key进行hash)。分桶的目的其实就是把大表化成“小表”(多个桶)。相同key的数据都在同一个桶中之后,再进行join操作,那么在联合的时候就会大幅度地减小无关项的扫描。
使用条件:
(1)两表进行分桶,桶的个数必须相等
(2)两边进行join时,join列==排序列==分桶列
hive当中也有此功能需要开启相应参数
那么接下来就先用Spark对表进行分桶
(1)分桶操作,并且保证两张表的桶的个数都相等,分桶列和排序列都是join列
def useBucket(sparkSession: SparkSession) = {
sparkSession.read.json("/user/atguigu/ods/coursepay.log")
.write.partitionBy("dt", "dn")
.format("parquet")
.bucketBy(10, "orderid")
.sortBy("orderid").mode(SaveMode.Overwrite)
.saveAsTable("dwd.dwd_course_pay_cluster")
sparkSession.read.json("/user/atguigu/ods/courseshoppingcart.log")
.write.partitionBy("dt", "dn")
.bucketBy(10, "orderid")
.format("parquet")
.sortBy("orderid").mode(SaveMode.Overwrite)
.saveAsTable("dwd.dwd_course_shopping_cart_cluster")
}
(2)执行分桶任务
(3)分完桶之后,Spark Sql查询分桶表,进行join。步骤不变,只是查询的表发生了变化,查询分桶表。
def useSMBJoin(sparkSession: SparkSession) = {
//查询出三张表 并进行join 插入到最终表中
val saleCourse = sparkSession.sql("select *from dwd.dwd_sale_course")
val coursePay = sparkSession.sql("select * from dwd.dwd_course_pay_cluster")
.withColumnRenamed("discount", "pay_discount")
.withColumnRenamed("createtime", "pay_createtime")
val courseShoppingCart = sparkSession.sql("select *from dwd.dwd_course_shopping_cart_cluster")
.drop("coursename")
.withColumnRenamed("discount", "cart_discount")
.withColumnRenamed("createtime", "cart_createtime")
//两张分桶表进行 join
val tmpdata = courseShoppingCart.join(coursePay, Seq("orderid"), "left")
//再与小表进行 join
val result = broadcast(saleCourse).join(tmpdata, Seq("courseid"), "right")
result.select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
, "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
"cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dwd.dwd_sale_course.dt", "dwd.dwd_sale_course.dn")
.write.mode(SaveMode.Overwrite).saveAsTable("dws.dws_salecourse_detail_2")
}
(4)提交任务查看,查看stage,可以看到分完桶之后,再进行Spark的shuffle操作,task也会发生变化,分区个数与桶个数相等。
(5)查看执行计划图
可以看到排序时间,分别缩减为了7.1秒和9.6秒
(测试数据不是特别多,不是特别明显)。
结论
在表与表进行join时,如果两张表都是非常大的数据量,那么可以考虑使用分桶进行join。SMB Join时得保证两表桶数量相等,join列等于排序列等于分桶列。
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。
上一篇: IDEA创建SparkSQL程序_大数据培训
下一篇: 用户自定义函数UDF_大数据培训