源码级解读如何解决Spark-sql读取hive分区表执行效率低问题

发布时间:2021年08月27日作者:atguigu浏览次数:904

在开发过程中使用spark去读取hive分区表的过程中(或者使用hive on spark、nodepad开发工具),部分开发人员未注意添加分区属性过滤导致在执行过程中加载了全量数据,引起任务执行效率低、磁盘IO大量损耗等问题。

解决办法

1、自定义规则CheckPartitionTable类,实现Rule,通过以下方式创建SparkSession。

type ExtensionsBuilder = SparkSessionExtensions => Unit//在Optimizer中追加CheckPartitionTable规则执行器
    val extBuilder: ExtensionsBuilder = { e => e.injectOptimizerRule(CheckPartitionTable) }
    val conf = new SparkConf()
        .setMaster("local[*]")
        .set("spark.table.check.partition", "true")
        .set("spark.table.check.partition.num","30")
        .setAppName("SQL")
    val spark = SparkSession
        .builder()
        .config(conf)
        .withExtensions(extBuilder)
        .enableHiveSupport()
        .getOrCreate()

2、自定义规则CheckPartitionTable类,实现Rule,将规则类追加至Optimizer.batches: Seq[Batch]中,如下。

源码级解读如何解决Spark-sql读取hive分区表执行效率低问题

 

规则内容实现

1、CheckPartitionTable规则执行类,需要通过引入sparkSession从而获取到引入conf;需要继承Rule[LogicalPlan];

case class CheckPartitionTable(sparkSession: SparkSession)
  extends Rule[LogicalPlan] with PredicateHelper {
  // 是否检查分区,配置
  val check_partition = "spark.table.check.partition"
  // 检查分区,限制分区读取数量,配置
  val check_num_partition = "spark.table.check.partition.num"
  val conf = sparkSession.conf

2、通过splitPredicates方法,分离分区谓词,得到分区谓词表达式。在sql解析过程中将谓词解析为TreeNode,此处采用递归的方式获取分区谓词。

def splitPredicates(condition: Expression,partitionSet :AttributeSet): Seq[Expression] = {
    condition match {
       //匹配and表达式,并筛选and表达式中的分区表达式
       case And(cond1, cond2) =>
          splitPredicates(cond1,partitionSet) ++ splitPredicates(cond2,partitionSet)
       //匹配or表达式,并筛选or表达式中的分区表达式
       case Or(cond1, cond2)=>
          val leftSeq = splitPredicates(cond1,partitionSet)                                                    
          val rightSeq = splitPredicates(cond2,partitionSet)        
          if(leftSeq.nonEmpty && rightSeq.nonEmpty)          
            Or(leftSeq.reduceLeft(And),rightSeq.reduceLeft(And)) :: Nil        
          else Nil      
        case other  => if (other.references.subsetOf(partitionSet)) other :: Nil else Nil    
       }  
     }

3、判断是否是分区表,且是否添加分区字段。

def isPartitionTable(filter: Filter,numPartition:Int): Boolean = {    
  var boolean = false    
  filter.child match {      
    // 匹配logicalRelation      
    case logicalRelation@LogicalRelation(fsRelation@HadoopFsRelation(location: CatalogFileIndex,      
    partitionSchema: StructType, _, _, _, _), _, catalogTable) =>        
      val table = catalogTable.get        
      // 判断读取表是否存在分区column        
      if (table.partitionColumnNames.nonEmpty) {          
        val sparkSession = fsRelation.sparkSession          
        // 获取表的分区column的Attribute          
        val partitionColumns =            
          logicalRelation.resolve(              
            partitionSchema, sparkSession.sessionState.analyzer.resolver)          
        log.info("partitionColumns : " + partitionColumns)          
        val partitionSet = AttributeSet(partitionColumns)          
       //获取分区Filter表达式          
        val partitionKeyFilters = splitPredicates(filter.condition,partitionSet)          
        var partition_size = -1L          
        if (partitionKeyFilters.nonEmpty) {            
          log.info("partitionKeyFiltersExpression:" + partitionKeyFilters)            
         //在hdfs上获取分区path            
          val prunedFileIndex = location.filterPartitions(partitionKeyFilters)            
          val partitions = prunedFileIndex.partitionSpec().partitions            
          partition_size = partitions.size            
          log.info("partitions : " + partitions)          
        }          
        boolean = partitionKeyFilters.isEmpty || partition_size > numPartition        
      }      
    //匹配 CatalogRelation      
    case catalogRelation:CatalogRelation =>        
      val partitionSet = AttributeSet(catalogRelation.partitionCols)        
      val partitionKeyFilters = splitPredicates(filter.condition,partitionSet)        
      // 判断是否存在分区属性        
      boolean = partitionKeyFilters.forall(_.references.subsetOf(partitionSet))      
    case _ => log.warn("未获取到表信息")    
  }    
  boolean  
}

4、实现Rule的apply方法

def apply(plan: LogicalPlan): LogicalPlan =    
  if (!conf.get(check_partition, "true").toBoolean) {      
    log.warn(s"Is not enabled $check_partition")      
    plan    
  } 
  else plan transform {      
    case j@Filter(condition: Expression, child: LogicalPlan)        
    if isPartitionTable(j,conf.get(check_num_partition,s"${Int.MaxValue}").toInt) =>        
    throw new Exception(          
      s"""
        ${condition.sql} ${child.treeString} No partition information is added to the partition table
      """.stripMargin)
   }

想要了解跟多关于

大数据培训

课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。

上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)