8. Dataset (DataFrame) 的基础操作
导读这一章节主要目的是介绍 Dataset
的基础操作, 当然, DataFrame
就是 Dataset
, 所以这些操作大部分也适用于 DataFrame
-
有类型的转换操作
-
无类型的转换操作
-
基础
Action
-
空值如何处理
-
统计操作
8.1. 有类型操作
分类 | 算子 | 解释 |
---|---|---|
转换 |
|
通过 |
|
|
|
|
|
|
|
|
|
|
|
|
过滤 |
|
|
聚合 |
|
其实这也印证了分组后必须聚合的道理 |
切分 |
|
|
|
|
|
排序 |
|
|
|
其实 |
|
分区 |
|
减少分区, 此算子和 |
|
|
|
去重 |
|
使用 |
|
当 所以, 使用 |
|
集合操作 |
|
|
|
求得两个集合的交集 |
|
|
求得两个集合的并集 |
|
|
限制结果集数量 |
8.2. 无类型转换
分类 | 算子 | 解释 |
---|---|---|
选择 |
|
|
|
在 |
|
|
通过 |
|
|
修改列名 |
|
剪除 |
drop |
剪掉某个列 |
聚合 |
groupBy |
按照给定的行进行分组 |
8.5. Column 对象
导读Column 表示了 Dataset 中的一个列, 并且可以持有一个表达式, 这个表达式作用于每一条数据, 对每条数据都生成一个值, 之所以有单独这样的一个章节是因为列的操作属于细节, 但是又比较常见, 会在很多算子中配合出现
分类 | 操作 | 解释 |
---|---|---|
创建 |
|
单引号 |
|
同理, |
|
|
|
|
|
|
|
|
前面的 |
|
|
可以通过
|
|
别名和转换 |
|
|
|
通过 |
|
添加列 |
|
通过 |
操作 |
|
通过 |
|
通过 |
|
|
在排序的时候, 可以通过 |
9. 缺失值处理
导读-
DataFrame
中什么时候会有无效值 -
DataFrame
如何处理无效的值 -
DataFrame
如何处理null
- 缺失值的处理思路
-
如果想探究如何处理无效值, 首先要知道无效值从哪来, 从而分析可能产生的无效值有哪些类型, 在分别去看如何处理无效值
- 什么是缺失值
-
一个值本身的含义是这个值不存在则称之为缺失值, 也就是说这个值本身代表着缺失, 或者这个值本身无意义, 比如说
null
, 比如说空字符串关于数据的分析其实就是统计分析的概念, 如果这样的话, 当数据集中存在缺失值, 则无法进行统计和分析, 对很多操作都有影响
- 缺失值如何产生的
-
Spark 大多时候处理的数据来自于业务系统中, 业务系统中可能会因为各种原因, 产生一些异常的数据
例如说因为前后端的判断失误, 提交了一些非法参数. 再例如说因为业务系统修改
MySQL
表结构产生的一些空值数据等. 总之在业务系统中出现缺失值其实是非常常见的一件事, 所以大数据系统就一定要考虑这件事. - 缺失值的类型
-
常见的缺失值有两种
-
null
,NaN
等特殊类型的值, 某些语言中null
可以理解是一个对象, 但是代表没有对象,NaN
是一个数字, 可以代表不是数字针对这一类的缺失值,
Spark
提供了一个名为DataFrameNaFunctions
特殊类型来操作和处理 -
"Null"
,"NA"
," "
等解析为字符串的类型, 但是其实并不是常规字符串数据针对这类字符串, 需要对数据集进行采样, 观察异常数据, 总结经验, 各个击破
-
-
DataFrameNaFunctions
-
DataFrameNaFunctions
使用Dataset
的na
函数来获取val df = ... val naFunc: DataFrameNaFunctions = df.na
当数据集中出现缺失值的时候, 大致有两种处理方式, 一个是丢弃, 一个是替换为某值,
DataFrameNaFunctions
中包含一系列针对空值数据的方案-
DataFrameNaFunctions.drop
可以在当某行中包含null
或NaN
的时候丢弃此行 -
DataFrameNaFunctions.fill
可以在将null
和NaN
充为其它值 -
DataFrameNaFunctions.replace
可以把null
或NaN
替换为其它值, 但是和fill
略有一些不同, 这个方法针对值来进行替换
-
-
如何使用
SparkSQL
处理null
和NaN
? -
首先要将数据读取出来, 此次使用的数据集直接存在
NaN
, 在指定Schema
后, 可直接被转为Double.NaN
val schema = StructType( List( StructField("id", IntegerType), StructField("year", IntegerType), StructField("month", IntegerType), StructField("day", IntegerType), StructField("hour", IntegerType), StructField("season", IntegerType), StructField("pm", DoubleType) ) ) val df = spark.read .option("header", value = true) .schema(schema) .csv("dataset/beijingpm_with_nan.csv")
对于缺失值的处理一般就是丢弃和填充
-
丢弃包含
null
和NaN
的行 -
当某行数据所有值都是
null
或者NaN
的时候丢弃此行df.na.drop("all").show()
当某行中特定列所有值都是
null
或者NaN
的时候丢弃此行df.na.drop("all", List("pm", "id")).show()
当某行数据任意一个字段为
null
或者NaN
的时候丢弃此行df.na.drop().show() df.na.drop("any").show()
当某行中特定列任意一个字段为
null
或者NaN
的时候丢弃此行df.na.drop(List("pm", "id")).show() df.na.drop("any", List("pm", "id")).show()
-
填充包含
null
和NaN
的列 -
填充所有包含
null
和NaN
的列df.na.fill(0).show()
填充特定包含
null
和NaN
的列df.na.fill(0, List("pm")).show()
根据包含
null
和NaN
的列的不同来填充import scala.collection.JavaConverters._ df.na.fill(Map[String, Any]("pm" -> 0).asJava).show
-
丢弃包含
-
如何使用
SparkSQL
处理异常字符串 ? -
读取数据集, 这次读取的是最原始的那个
PM
数据集val df = spark.read .option("header", value = true) .csv("dataset/BeijingPM20100101_20151231.csv")
使用函数直接转换非法的字符串
df.select(‘No as "id", ‘year, ‘month, ‘day, ‘hour, ‘season, when(‘PM_Dongsi === "NA", 0) .otherwise(‘PM_Dongsi cast DoubleType) .as("pm")) .show()
使用
where
直接过滤df.select(‘No as "id", ‘year, ‘month, ‘day, ‘hour, ‘season, ‘PM_Dongsi) .where(‘PM_Dongsi =!= "NA") .show()
使用
DataFrameNaFunctions
替换, 但是这种方式被替换的值和新值必须是同类型df.select(‘No as "id", ‘year, ‘month, ‘day, ‘hour, ‘season, ‘PM_Dongsi) .na.replace("PM_Dongsi", Map("NA" -> "NaN")) .show()
10. 聚合
导读-
groupBy
-
rollup
-
cube
-
pivot
-
RelationalGroupedDataset
上的聚合操作
-
groupBy
-
groupBy
算子会按照列将Dataset
分组, 并返回一个RelationalGroupedDataset
对象, 通过RelationalGroupedDataset
可以对分组进行聚合- Step 1: 加载实验数据
-
private val spark = SparkSession.builder() .master("local[6]") .appName("aggregation") .getOrCreate() import spark.implicits._ private val schema = StructType( List( StructField("id", IntegerType), StructField("year", IntegerType), StructField("month", IntegerType), StructField("day", IntegerType), StructField("hour", IntegerType), StructField("season", IntegerType), StructField("pm", DoubleType) ) ) private val pmDF = spark.read .schema(schema) .option("header", value = true) .csv("dataset/pm_without_null.csv")
-
Step 2: 使用
functions
函数进行聚合 -
import org.apache.spark.sql.functions._ val groupedDF: RelationalGroupedDataset = pmDF.groupBy(‘year) groupedDF.agg(avg(‘pm) as "pm_avg") .orderBy(‘pm_avg) .show()
-
Step 3: 除了使用
functions
进行聚合, 还可以直接使用RelationalGroupedDataset
的API
进行聚合 -
groupedDF.avg("pm") .orderBy(‘pm_avg) .show() groupedDF.max("pm") .orderBy(‘pm_avg) .show()
- 多维聚合
-
我们可能经常需要针对数据进行多维的聚合, 也就是一次性统计小计, 总计等, 一般的思路如下
- Step 1: 准备数据
-
private val spark = SparkSession.builder() .master("local[6]") .appName("aggregation") .getOrCreate() import spark.implicits._ private val schemaFinal = StructType( List( StructField("source", StringType), StructField("year", IntegerType), StructField("month", IntegerType), StructField("day", IntegerType), StructField("hour", IntegerType), StructField("season", IntegerType), StructField("pm", DoubleType) ) ) private val pmFinal = spark.read .schema(schemaFinal) .option("header", value = true) .csv("dataset/pm_final.csv")
- Step 2: 进行多维度聚合
-
import org.apache.spark.sql.functions._ val groupPostAndYear = pmFinal.groupBy(‘source, ‘year) .agg(sum("pm") as "pm") val groupPost = pmFinal.groupBy(‘source) .agg(sum("pm") as "pm") .select(‘source, lit(null) as "year", ‘pm) groupPostAndYear.union(groupPost) .sort(‘source, ‘year asc_nulls_last, ‘pm) .show()
大家其实也能看出来, 在一个数据集中又小计又总计, 可能需要多个操作符, 如何简化呢? 请看下面
-
rollup
操作符 -
rollup
操作符其实就是groupBy
的一个扩展,rollup
会对传入的列进行滚动groupBy
,groupBy
的次数为列数量1
, 最后一次是对整个数据集进行聚合- Step 1: 创建数据集
-
import org.apache.spark.sql.functions._ val sales = Seq( ("Beijing", 2016, 100), ("Beijing", 2017, 200), ("Shanghai", 2015, 50), ("Shanghai", 2016, 150), ("Guangzhou", 2017, 50) ).toDF("city", "year", "amount")
-
Step 1:
rollup
的操作 -
sales.rollup("city", "year") .agg(sum("amount") as "amount") .sort($"city".desc_nulls_last, $"year".asc_nulls_last) .show() /** * 结果集: * --------- ---- ------ * | city|year|amount| * --------- ---- ------ * | Shanghai|2015| 50| <-- 上海 2015 的小计 * | Shanghai|2016| 150| * | Shanghai|null| 200| <-- 上海的总计 * |Guangzhou|2017| 50| * |Guangzhou|null| 50| * | Beijing|2016| 100| * | Beijing|2017| 200| * | Beijing|null| 300| * | null|null| 550| <-- 整个数据集的总计 * --------- ---- ------ */
- Step 2: 如果使用基础的 groupBy 如何实现效果?
-
val cityAndYear = sales .groupBy("city", "year") // 按照 city 和 year 聚合 .agg(sum("amount") as "amount") val city = sales .groupBy("city") // 按照 city 进行聚合 .agg(sum("amount") as "amount") .select($"city", lit(null) as "year", $"amount") val all = sales .groupBy() // 全局聚合 .agg(sum("amount") as "amount") .select(lit(null) as "city", lit(null) as "year", $"amount") cityAndYear .union(city) .union(all) .sort($"city".desc_nulls_last, $"year".asc_nulls_last) .show() /** * 统计结果: * --------- ---- ------ * | city|year|amount| * --------- ---- ------ * | Shanghai|2015| 50| * | Shanghai|2016| 150| * | Shanghai|null| 200| * |Guangzhou|2017| 50| * |Guangzhou|null| 50| * | Beijing|2016| 100| * | Beijing|2017| 200| * | Beijing|null| 300| * | null|null| 550| * --------- ---- ------ */
很明显可以看到, 在上述案例中,
rollup
就相当于先按照city
,year
进行聚合, 后按照city
进行聚合, 最后对整个数据集进行聚合, 在按照city
聚合时,year
列值为null
, 聚合整个数据集的时候, 除了聚合列, 其它列值都为null
-
使用
rollup
完成pm
值的统计 -
上面的案例使用
rollup
来实现会非常的简单import org.apache.spark.sql.functions._ pmFinal.rollup(‘source, ‘year) .agg(sum("pm") as "pm_total") .sort(‘source.asc_nulls_last, ‘year.asc_nulls_last) .show()
-
cube
-
cube
的功能和rollup
是一样的, 但也有区别, 区别如下-
rollup(A, B).sum©
其结果集中会有三种数据形式:
A B C
,A null C
,null null C
不知道大家发现没, 结果集中没有对
B
列的聚合结果 -
cube(A, B).sum©
其结果集中会有四种数据形式:
A B C
,A null C
,null null C
,null B C
不知道大家发现没, 比
rollup
的结果集中多了一个null B C
, 也就是说,rollup
只会按照第一个列来进行组合聚合, 但是cube
会将全部列组合聚合
import org.apache.spark.sql.functions._ pmFinal.cube(‘source, ‘year) .agg(sum("pm") as "pm_total") .sort(‘source.asc_nulls_last, ‘year.asc_nulls_last) .show() /** * 结果集为 * * ------- ---- --------- * | source|year| pm_total| * ------- ---- --------- * | dongsi|2013| 735606.0| * | dongsi|2014| 745808.0| * | dongsi|2015| 752083.0| * | dongsi|null|2233497.0| * |us_post|2010| 841834.0| * |us_post|2011| 796016.0| * |us_post|2012| 750838.0| * |us_post|2013| 882649.0| * |us_post|2014| 846475.0| * |us_post|2015| 714515.0| * |us_post|null|4832327.0| * | null|2010| 841834.0| <-- 新增 * | null|2011| 796016.0| <-- 新增 * | null|2012| 750838.0| <-- 新增 * | null|2013|1618255.0| <-- 新增 * | null|2014|1592283.0| <-- 新增 * | null|2015|1466598.0| <-- 新增 * | null|null|7065824.0| * ------- ---- --------- */
-
-
SparkSQL
中支持的SQL
语句实现cube
功能 -
SparkSQL
支持GROUPING SETS
语句, 可以随意排列组合空值分组聚合的顺序和组成, 既可以实现cube
也可以实现rollup
的功能pmFinal.createOrReplaceTempView("pm_final") spark.sql( """ |select source, year, sum(pm) |from pm_final |group by source, year |grouping sets((source, year), (source), (year), ()) |order by source asc nulls last, year asc nulls last """.stripMargin) .show()
-
RelationalGroupedDataset
-
常见的
RelationalGroupedDataset
获取方式有三种-
groupBy
-
rollup
-
cube
无论通过任何一种方式获取了
RelationalGroupedDataset
对象, 其所表示的都是是一个被分组的DataFrame
, 通过这个对象, 可以对数据集的分组结果进行聚合val groupedDF: RelationalGroupedDataset = pmDF.groupBy(‘year)
需要注意的是,
RelationalGroupedDataset
并不是DataFrame
, 所以其中并没有DataFrame
的方法, 只有如下一些聚合相关的方法, 如下这些方法在调用过后会生成DataFrame
对象, 然后就可以再次使用DataFrame
的算子进行操作了操作符 解释 avg
求平均数
count
求总数
max
求极大值
min
求极小值
mean
求均数
sum
求和
agg
聚合, 可以使用
sql.functions
中的函数来配合进行操作pmDF.groupBy(‘year) .agg(avg(‘pm) as "pm_avg")
-
11. 连接
导读-
无类型连接
join
-
连接类型
Join Types
-
无类型连接算子
join
的API
-
- Step 1: 什么是连接
-
按照 PostgreSQL 的文档中所说, 只要能在一个查询中, 同一时间并发的访问多条数据, 就叫做连接.
做到这件事有两种方式
-
一种是把两张表在逻辑上连接起来, 一条语句中同时访问两张表
select * from user join address on user.address_id = address.id
-
还有一种方式就是表连接自己, 一条语句也能访问自己中的多条数据
select * from user u1 join (select * from user) u2 on u1.id = u2.id
-
-
Step 2:
join
算子的使用非常简单, 大致的调用方式如下 -
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
- Step 3: 简单连接案例
-
表结构如下
--- ------ ------ --- --------- | id| name|cityId| | id| name| --- ------ ------ --- --------- | 0| Lucy| 0| | 0| Beijing| | 1| Lily| 0| | 1| Shanghai| | 2| Tim| 2| | 2|Guangzhou| | 3|Danial| 0| --- --------- --- ------ ------
如果希望对这两张表进行连接, 首先应该注意的是可以连接的字段, 比如说此处的左侧表
cityId
和右侧表id
就是可以连接的字段, 使用join
算子就可以将两个表连接起来, 进行统一的查询val person = Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 0)) .toDF("id", "name", "cityId") val cities = Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou")) .toDF("id", "name") person.join(cities, person.col("cityId") === cities.col("id")) .select(person.col("id"), person.col("name"), cities.col("name") as "city") .show() /** * 执行结果: * * --- ------ --------- * | id| name| city| * --- ------ --------- * | 0| Lucy| Beijing| * | 1| Lily| Beijing| * | 2| Tim|Guangzhou| * | 3|Danial| Beijing| * --- ------ --------- */
- Step 4: 什么是连接?
-
现在两个表连接得到了如下的表
--- ------ --------- | id| name| city| --- ------ --------- | 0| Lucy| Beijing| | 1| Lily| Beijing| | 2| Tim|Guangzhou| | 3|Danial| Beijing| --- ------ ---------
通过对这张表的查询, 这个查询是作用于两张表的, 所以是同一时间访问了多条数据
spark.sql("select name from user_city where city = ‘Beijing‘").show() /** * 执行结果 * * ------ * | name| * ------ * | Lucy| * | Lily| * |Danial| * ------ */
- 连接类型
-
如果要运行如下代码, 需要先进行数据准备
private val spark = SparkSession.builder() .master("local[6]") .appName("aggregation") .getOrCreate() import spark.implicits._ val person = Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 3)) .toDF("id", "name", "cityId") person.createOrReplaceTempView("person") val cities = Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou")) .toDF("id", "name") cities.createOrReplaceTempView("cities")
连接类型 类型字段 解释 交叉连接
cross
- 解释
-
交叉连接就是笛卡尔积, 就是两个表中所有的数据两两结对
交叉连接是一个非常重的操作, 在生产中, 尽量不要将两个大数据集交叉连接, 如果一定要交叉连接, 也需要在交叉连接后进行过滤, 优化器会进行优化
-
SQL
语句 -
select * from person cross join cities
-
Dataset
操作 -
person.crossJoin(cities) .where(person.col("cityId") === cities.col("id")) .show()
内连接
inner
- 解释
-
内连接就是按照条件找到两个数据集关联的数据, 并且在生成的结果集中只存在能关联到的数据
-
SQL
语句 -
select * from person inner join cities on person.cityId = cities.id
-
Dataset
操作 -
person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "inner") .show()
全外连接
outer
,full
,fullouter
- 解释
-
内连接和外连接的最大区别, 就是内连接的结果集中只有可以连接上的数据, 而外连接可以包含没有连接上的数据, 根据情况的不同, 外连接又可以分为很多种, 比如所有的没连接上的数据都放入结果集, 就叫做全外连接
-
SQL
语句 -
select * from person full outer join cities on person.cityId = cities.id
-
Dataset
操作 -
person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "full") // "outer", "full", "full_outer" .show()
左外连接
leftouter
,left
- 解释
-
左外连接是全外连接的一个子集, 全外连接中包含左右两边数据集没有连接上的数据, 而左外连接只包含左边数据集中没有连接上的数据
-
SQL
语句 -
select * from person left join cities on person.cityId = cities.id
-
Dataset
操作 -
person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "left") // leftouter, left .show()
LeftAnti
leftanti
- 解释
-
LeftAnti
是一种特殊的连接形式, 和左外连接类似, 但是其结果集中没有右侧的数据, 只包含左边集合中没连接上的数据 -
SQL
语句 -
select * from person left anti join cities on person.cityId = cities.id
-
Dataset
操作 -
person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "left_anti") .show()
LeftSemi
leftsemi
- 解释
-
和
LeftAnti
恰好相反,LeftSemi
的结果集也没有右侧集合的数据, 但是只包含左侧集合中连接上的数据 -
SQL
语句 -
select * from person left semi join cities on person.cityId = cities.id
-
Dataset
操作 -
person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "left_semi") .show()
右外连接
rightouter
,right
- 解释
-
右外连接和左外连接刚好相反, 左外是包含左侧未连接的数据, 和两个数据集中连接上的数据, 而右外是包含右侧未连接的数据, 和两个数据集中连接上的数据
-
SQL
语句 -
select * from person right join cities on person.cityId = cities.id
-
Dataset
操作 -
person.join(right = cities, joinExprs = person("cityId") === cities("id"), joinType = "right") // rightouter, right .show()
- [扩展] 广播连接
-
-
Step 1: 正常情况下的
Join
过程 -
Join
会在集群中分发两个数据集, 两个数据集都要复制到Reducer
端, 是一个非常复杂和标准的ShuffleDependency
, 有什么可以优化效率吗? -
Step 2:
Map
端Join
-
前面图中看的过程, 之所以说它效率很低, 原因是需要在集群中进行数据拷贝, 如果能减少数据拷贝, 就能减少开销
如果能够只分发一个较小的数据集呢?
可以将小数据集收集起来, 分发给每一个
Executor
, 然后在需要Join
的时候, 让较大的数据集在Map
端直接获取小数据集, 从而进行Join
, 这种方式是不需要进行Shuffle
的, 所以称之为Map
端Join
-
Step 3:
Map
端Join
的常规实现 -
如果使用
RDD
的话, 该如何实现Map
端Join
呢?val personRDD = spark.sparkContext.parallelize(Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 3))) val citiesRDD = spark.sparkContext.parallelize(Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou"))) val citiesBroadcast = spark.sparkContext.broadcast(citiesRDD.collectAsMap()) val result = personRDD.mapPartitions( iter => { val citiesMap = citiesBroadcast.value // 使用列表生成式 yield 生成列表 val result = for (person <- iter if citiesMap.contains(person._3)) yield (person._1, person._2, citiesMap(person._3)) result } ).collect() result.foreach(println(_))
-
Step 4: 使用
Dataset
实现Join
的时候会自动进行Map
端Join
-
自动进行
Map
端Join
需要依赖一个系统参数spark.sql.autoBroadcastJoinThreshold
, 当数据集小于这个参数的大小时, 会自动进行Map
端Join
如下, 开启自动
Join
println(spark.conf.get("spark.sql.autoBroadcastJoinThreshold").toInt / 1024 / 1024) println(person.crossJoin(cities).queryExecution.sparkPlan.numberedTreeString)
当关闭这个参数的时候, 则不会自动 Map 端 Join 了
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) println(person.crossJoin(cities).queryExecution.sparkPlan.numberedTreeString)
- Step 5: 也可以使用函数强制开启 Map 端 Join
-
在使用 Dataset 的 join 时, 可以使用 broadcast 函数来实现 Map 端 Join
import org.apache.spark.sql.functions._ spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) println(person.crossJoin(broadcast(cities)).queryExecution.sparkPlan.numberedTreeString)
即使是使用 SQL 也可以使用特殊的语法开启
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val resultDF = spark.sql( """ |select /* MAPJOIN (rt) */ * from person cross join cities rt """.stripMargin) println(resultDF.queryExecution.sparkPlan.numberedTreeString)
-
Step 1: 正常情况下的