spark点点滴滴 —— 认识spark sql的DataFrame和DataSet

时间:2022-08-14 22:26:34

概述

spark的DataFrames和DataSets是spark SQL中的关键概念,相比于RDD,DataFrame更能描述数据类型,因此是spark sql的基础类型,同时在spark 2.0.x及其以后的版本中,spark的机器学习也会逐渐替换成基于DataFrame的api,所有我们有必要了解spark的DataFrame相关概念。
spark sql在spark框架中的位置:
spark点点滴滴 —— 认识spark sql的DataFrame和DataSet
我们可以看到,spark sql是建立在spark框架之上的一个大数据关系型数据处理系统,和impala类似。
其核心部分有两个:DataFrame API和Catalyst引擎,SparkSQL的优化器系统Catalyst和大多数当前的大数据SQL处理引擎设计基本相同(Impala、Presto、Hive(Calcite)等),本篇我们重点看看DataFrame的基本原理和使用。

DataFrame基本原理

和RDD的比较

我们知道,RDD是spark早期很重要的一个概念,是数据的immutable distributed的集合,由不同节点上的partition组成。DataFrame和RDD类似,也是数据的不可变分布式集合。不同的是,数据被组织成带名字的列,就像关系型数据库中的表。是一种有结构的高级别抽象,与之相应的提供了一种领域特定语言(DSL)API来操作这些分布式数据。
spark点点滴滴 —— 认识spark sql的DataFrame和DataSet
DataFrame直观上很像是RDDs的加强版,它和RDDs在数据存储上最大的区别就在于,DataFrame是有Schema的,通俗的讲,就是上图中蓝色框住的那个表头。不要小看这一点,对于复杂的数据类型,DataFrame的这种结构可以使编程大大简化。

在spark2.0后,DataFrame的API和DataSet的API合并统一了,现在只需要处理DataSet相关API即可。
spark点点滴滴 —— 认识spark sql的DataFrame和DataSet

DataSet

从spark 2.0开始,两种独立的API特点:strongly-typed API 和untyped API。从概念上来说,将DataFrame作为 一般对象Dataset[Row]的集合的别名,而DataSet是strongly-typed JVM对象的集合,即java和scala中的类。
不同语言对应抽象类型如下:

Language Main Abstraction
Scala Dataset[T] & DataFrame (alias for Dataset[Row])
Java Dataset[T]
Python* DataFrame
R* DataFrame

为什么用DataFrame

在RDDs中,数据集中的每项数据都是一个整体,因为你无法得知其内部的结构,这也就使得你对数据项的操作能力很弱,当你想获得数据项内部的部分信息的时候,你需要手动将object按照你预先设定的数据格式进行分割,麻烦,且容易出错。而使用DataFrame,意味着你可以直接获得数据项的内部数据结构,并且由于DataFrame的Schema的存在,数据项的转换也都将是类型安全的,这对于较为复杂的数据计算程序的调试是十分有利的,很多数据类型不匹配的问题都可以在编译阶段就被检查出来,而对于不合法的数据文件,DataFrame也具备一定分辨能力。

而另一方面,我们注意到,当RDD被切割出“列”并加上“表头”变成DataFrame之后,就意味着DataFrame要支持比RDD更加细粒度的查询,而这种Table式的结构,很容易就可以让我们联想到数据库中数据表,同时DataFrame API也支持使用者对DataFrame进行数据库那样的关联、聚合、筛选等查询操作。
对DataFrame的查询,都是在RDDs数据集上做得直接映射,。你可以把DataFrame中的数据,理解为“逻辑数据”,而“物理数据”实际上在RDDs中。

DataFrame性能

为了能够更快更好的定位到数据,甚至于更好的利用内存与磁盘中的存储空间,DataFrame中的数据在内存和磁盘中的排列也必须更为考究,才能够在不损失性能的前提下提供这些操作。Spark SQL团队给出的方案是:按列压缩存储。
而列式存储则是将行拆开,将一列的数据放在一起,同时不同列可以存放在不同的位置(由于天然利于纵向分表,所以在超大数据集的存储上,列式存储也具有一定优势)。通常情况下,我们查询一个数据并不需要检查一行数据中的每个列条目,但是在行式存储中,必须要扫描全部数据集才能够筛选出我们想要的那条数据,既然我们检索的项目很可能只是“Id”一项而已,那为什么要去管其他列呢?特别是在磁盘上,磁头访问数据的方式是线性的,如果只想根据“Id”进行筛选,即便只是上面那个只有两列的数据表,磁头移动的距离也要超过列式存储的好几倍。不过相应的,列式存储中“更新”“插入“”查询“等操作会比较麻烦,但是由于DataFrame和RDDs一样都是Immutable的,所以恰好规避了这一问题。
最终,DataFrame API中这种支持对列进行访问的形式,要比RDDs API的数据访问粒度更为细腻,这也就意味着数据工程师可以根据“列”的性质,来为列建立索引,从而避免遍历所有的数据项。

DataSet的操作

我们先使用spark-shell来简单看下DataSet的结构:
我们将json数据文件people.json上传到hdfs上/XXX/spark/people.json,内容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

在spark-shell中执行结果如下spark点点滴滴 —— 认识spark sql的DataFrame和DataSet
更多DataSet上的api可以参考这里

如何构建DataSet

和class类对应转换

在spark-shell上执行效果如下图:
spark点点滴滴 —— 认识spark sql的DataFrame和DataSet

从RDD转换

构造文件people.txt,内容如下:

Michael, 29
Andy, 30
Justin, 19

上次到hdfs上,执行效果:
spark点点滴滴 —— 认识spark sql的DataFrame和DataSet

编程指定

scala代码如下:

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

【参考资料】A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets
【参考资料】进化的Spark, 从DataFrame说起