课堂总结。

时间:2025-04-16 13:14:47

第三章第六节 Spark-SQL核心编程(五)自定义函数:UDF:val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")//创建SparkSession对象val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._//读取json文件val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")spark.udf.register("addName",(x:String)=>"Name:"+x)df.createOrReplaceTempView("people")spark.sql("select addName(username),age from people").show()spark.stop()UDAF(自定义聚合函数)强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator实验需求:计算平均工资实现方式一:RDDval sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)val resRDD: (Int, Int) = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40))).map { case (name, salary) =>{ (salary, 1) }}.reduce { (t1, t2) =>{ (t1._1 + t2._1, t1._2 + t2._2) }}println(resRDD._1/resRDD._2)// 关闭连接sc.stop()实现方式二:弱类型UDAFclass MyAverageUDAF extends UserDefinedAggregateFunction{ def inputSchema: StructType = StructType(Array(StructField("salary",IntegerType))) // 聚合函数缓冲区中值的数据类型(salary,count) def bufferSchema: StructType = { StructType(Array(StructField("sum",LongType),StructField("count",LongType))) } // 函数返回值的数据类型 def dataType: DataType = DoubleType // 稳定性:对于相同的输入是否一直返回相同的输出。 def deterministic: Boolean = true // 函数缓冲区初始化 def initialize(buffer: MutableAggregationBuffer): Unit = { // 存薪资的总和 buffer(0) = 0L // 存薪资的个数 buffer(1) = 0L } // 更新缓冲区中的数据 def update(buffer: MutableAggregationBuffer,input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getInt(0) buffer(1) = buffer.getLong(1) + 1 } } // 合并缓冲区 def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // 计算最终结果 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)}val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")val spark:SparkSession = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._val res :RDD[(String,Int)]= spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40)))val df :DataFrame = res.toDF("name","salary")df.createOrReplaceTempView("user")var myAverage = new MyAverageUDAF//在 spark 中注册聚合函数spark.udf.register("avgSalary",myAverage)spark.sql("select avgSalary(salary) from user").show()// 关闭连接spark.stop()实现方式三:强类型UDAFcase class Buff(var sum:Long,var cnt:Long)class MyAverageUDAF extends Aggregator[Long,Buff,Double]{ override def zero: Buff = Buff(0,0) override def reduce(b: Buff, a: Long): Buff = { b.sum += a b.cnt += 1 b } override def merge(b1: Buff, b2: Buff): Buff = { b1.sum += b2.sum b1.cnt += b2.cnt b1 } override def finish(reduction: Buff):