用户自定义函数UDAF_大数据培训

发布时间:2021年08月18日作者:atguigu浏览次数:556

UDAF

强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。

1)需求:实现求平均工资的自定义聚合函数。

2)代码实现

import org.apache.spark.sql.expressions.MutableAggregationBuffer

import org.apache.spark.sql.expressions.UserDefinedAggregateFunction

import org.apache.spark.sql.types._

import org.apache.spark.sql.Row

import org.apache.spark.sql.SparkSession

object MyAverage extends UserDefinedAggregateFunction {

// 聚合函数输入参数的数据类型

def inputSchema: StructType = StructType(StructField(“inputColumn”, LongType) :: Nil)

// 聚合缓冲区中值得数据类型

def bufferSchema: StructType = {

StructType(StructField(“sum”, LongType) :: StructField(“count”, LongType) :: Nil)

}

// 返回值的数据类型

def dataType: DataType = DoubleType

// 对于相同的输入是否一直返回相同的输出。

def deterministic: Boolean = true

// 初始化

def initialize(buffer: MutableAggregationBuffer): Unit = {

// 存工资的总额

buffer(0) = 0L

// 存工资的个数

buffer(1) = 0L

}

// 同一个分区数据合并。

def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

if (!input.isNullAt(0)) {

buffer(0) = buffer.getLong(0) + input.getLong(0)

buffer(1) = buffer.getLong(1) + 1

}

}

// 不同分区间数据合并

def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {

buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)

buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)

}

// 计算最终结果

def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)

}

3)函数使用

用户自定义函数UDAF_大数据培训

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦3层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)

成都市成华区北辰星拱青创园综合楼3层(成都校区)