一:程序
1.需求
实现一个求平均值的UDAF。
这里保留Double格式化,在完成求平均值后与系统的AVG进行对比,观察正确性。
2.SparkSQLUDFDemo程序
package com.scala.it import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext} import scala.math.BigDecimal.RoundingMode object SparkSQLUDFDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("udf")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = new HiveContext(sc) // ==================================
// 写一个Double数据格式化的自定义函数(给定保留多少位小数部分)
sqlContext.udf.register(
"doubleValueFormat", // 自定义函数名称
(value: Double, scale: Int) => {
// 自定义函数处理的代码块
BigDecimal.valueOf(value).setScale(scale, RoundingMode.HALF_DOWN).doubleValue()
}) // 自定义UDAF
sqlContext.udf.register("selfAvg", AvgUDAF) sqlContext.sql(
"""
|SELECT
| deptno,
| doubleValueFormat(AVG(sal), 2) AS avg_sal,
| doubleValueFormat(selfAvg(sal), 2) AS self_avg_sal
|FROM hadoop09.emp
|GROUP BY deptno
""".stripMargin).show() }
}
3.AvgUDAF程序
package com.scala.it import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._ object AvgUDAF extends UserDefinedAggregateFunction{
override def inputSchema: StructType = {
// 给定UDAF的输出参数类型
StructType(
StructField("sal", DoubleType) :: Nil
)
} override def bufferSchema: StructType = {
// 在计算过程中会涉及到的缓存数据类型
StructType(
StructField("total_sal", DoubleType) ::
StructField("count_sal", LongType) :: Nil
)
} override def dataType: DataType = {
// 给定该UDAF返回的数据类型
DoubleType
} override def deterministic: Boolean = {
// 主要用于是否支持近似查找,如果为false:表示支持多次查询允许结果不一样,为true表示结果必须一样
true
} override def initialize(buffer: MutableAggregationBuffer): Unit = {
// 初始化 ===> 初始化缓存数据
buffer.update(0, 0.0) // 初始化total_sal
buffer.update(1, 0L) // 初始化count_sal
} override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// 根据输入的数据input,更新缓存buffer的内容
// 获取输入的sal数据
val inputSal = input.getDouble(0) // 获取缓存中的数据
val totalSal = buffer.getDouble(0)
val countSal = buffer.getLong(1) // 更新缓存数据
buffer.update(0, totalSal + inputSal)
buffer.update(1, countSal + 1L)
} override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
// 当两个分区的数据需要进行合并的时候,该方法会被调用
// 功能:将buffer2中的数据合并到buffer1中
// 获取缓存区数据
val buf1Total = buffer1.getDouble(0)
val buf1Count = buffer1.getLong(1) val buf2Total = buffer2.getDouble(0)
val buf2Count = buffer2.getLong(1) // 更新缓存区
buffer1.update(0, buf1Total + buf2Total)
buffer1.update(1, buf1Count + buf2Count)
} override def evaluate(buffer: Row): Any = {
// 求返回值
buffer.getDouble(0) / buffer.getLong(1)
}
}
4.效果
二:知识点
1.udf注册
2.解释上面的update
重要的是两个参数的意思,不然程序有些看不懂。
所以,程序的意思是,第一位存储总数,第二位存储个数。
3.还要解释一个StructType的生成
在以前的程序中,是使用Array来生成的。如:
在上面的程序中,不是这种方式,使用集合的方式。