1.概述
Spark SQL 是 Spark 处理结构化数据的一个模块。与基础的 Spark RDD API 不同,Spark SQL 提供了查询结构化数据及计算结果等信息的接口。在内部,Spark SQL 使用这个额外的信息去执行额外的优化。有几种方式可以跟 Spark SQL 进行交互,包括 SQL 和 Dataset API。当使用相同执行引擎进行计算时,无论使用哪种 API / 语言都可以快速的计算。这种统一意味着开发人员能够在基于提供最自然的方式来表达一个给定的 transformation API 之间实现轻松的来回切换不同的 。
2.SQL
Spark SQL 的功能之一是执行 SQL 查询。Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据,不过需要进行hive配置(关于这部分不再我们讨论范围内,请自行百度)。Spark SQL也可以直接从本地读取文件。
3.Datasets 和 DataFrames
一个 Dataset 是一个分布式的数据集合。Dataset 是在 Spark 1.6 中被添加的新接口,它提供了 RDD 的优点(强类型化,能够使用强大的 lambda 函数)与 Spark SQL 优化的执行引擎的好处。一个 Dataset 可以从 JVM 对象来构造并且使用转换功能(map,flatMap,filter,等等)。Dataset API 在 Scala 和 Java 中是可用的。Python 不支持 Dataset API。但是由于 Python 的动态特性,许多 Dataset API 的有点已经可用了(也就是说,你可能通过 name 天生的 row.columnName 属性访问一行中的字段)。这种情况和 R 相似。
一个 DataFrame 是一个 Dataset 组织成的指定列。它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的,但是有更多的优化。DataFrame 可以从大量的 Source 中构造出来,像 : 结构化的数据文件,Hive 中的表,外部的数据库,或者已存在的 RDD。DataFrame API 在 Scala,Java,Python 和 R 中是可用的。在 Scala 和 Java 中,一个 DataFrame 所代表的是一个多个 Row(行)的 Dataset。在 Scala API 中,DataFrame 仅仅是一个 Dataset[Row] 类型的别名 。然而,在 Java API 中,用户需要去使用 Dataset 来表示 DataFrame。
4.SparkSession-Spark的一个全新的切入点
在Spark的早期版本,sparkContext是进入Spark的切入点。我们都知道RDD是Spark中重要的API,然而它的创建和操作得使用sparkContext提供的API;对于RDD之外的其他东西,我们需要使用其他的Context。比如对于流处理来说,我们得使用StreamingContext;对于SQL得使用sqlContext;而对于hive得使用HiveContext。然而DataSet和Dataframe提供的API逐渐称为新的标准API,我们需要一个切入点来构建它们,所以在 Spark 2.0中我们引入了一个新的切入点(entry point):SparkSession
SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
5.进入Sparksession
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL Example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
实际上spark-shell本身已经开启了sparkSession,命名为spark,我们可以直接使用
6.创建 DataFrames
与一个 SparkSession 一起,应用程序可以从一个 已存在的 RDD,或者一个 Hive 表中,或者从 Spark 数据源 中创建 DataFrame。
举个例子,下面基于一个 JSON 文件的内容创建一个 DataFrame :
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。
7.无类型 Dataset 操作(aka DataFrame 操作)
DataFrame 提供了一个 DSL(domain-specific language)用于在 Scala,Java,Python 或者 R 中的结构化数据操作。
正如上面提到的一样,在Spark 2.0 ,Scala 和 JavaAPI 中的 DataFrame 仅仅是多个 Row(行)的 Dataset 。这些操作也参考了与强类型的 Scala/Java Datasets 的 “类型转换” 相对应的 “无类型转换”。
这里包括一些使用 Dataset 进行结构化数据处理的示例 :
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。
8.以编程的方式运行 SQL 查询
// SparkSession 使应用程序的 SQL 函数能够以编程的方式运行 SQL 查询并且将查询结果以一个 DataFrame。
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。
9.创建 Datasets
Dataset 与 RDD 相似,然而,并不是使用 Java 序列化或者 Kryo,他们使用一个指定的 Encoder(编码器) 来序列化用于处理或者通过网络进行传输的对象。虽然编码器和标准的序列化都负责将一个对象序列化成字节,编码器是动态生成的代码,并且使用了一种允许 Spark 去执行许多像 filtering,sorting 以及 hashing 这样的操作,不需要将字节反序列化成对象的格式。
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。
10.数据源(读写操作)
spark SQL 支持通过 DataFrame 接口对各种 data sources (数据源)进行操作. DataFrame 可以使用 relational transformations (关系转换)操作, 也可用于创建 temporary view (临时视图). 将 DataFrame 注册为 temporary view (临时视图)允许您对其数据运行 SQL 查询. 本节 描述了使用 Spark Data Sources 加载和保存数据的一般方法, 然后涉及可用于 built-in data sources (内置数据源)的 specific options (特定选项).
11.通用的 Load/Save 函数
在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
或者
val df = sparkSession.read.option("header","true").
csv("examples/src/main/resources/sales.csv")
12.手动指定选项
你也可以手动的指定数据源,并且将与你想要传递给数据源的任何额外选项一起使用。数据源由其完全限定名指定(例如 : org.apache.spark.sql.parquet),不过对于内置数据源你也可以使用它们的缩写名(json, parquet, jdbc)。使用下面这个语法可以将从任意类型数据源加载的DataFrames 转换为其他类型。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name","age").write.format("parquet").save("namesAndAges.parquet")
完整的示例代码在
“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”。
13.直接在文件上运行 SQL
不使用读取 API 将文件加载到 DataFrame 并进行查询, 也可以直接用 SQL 查询该文件.
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
14.Parquet Files
Parquet 是许多其他数据处理系统支持的 columnar format (柱状格式). Spark SQL 支持读写 Parquet 文件, 可自动保留 schema of the original data (原始数据的模式). 当编写 Parquet 文件时, 出于兼容性原因, 所有 columns 都将自动转换为可空.
Loading Data Programmatically (以编程的方式加载数据)
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
参考博客:
操作技巧:将 Spark 中的文本转换为 Parquet 以提升性能
https://www.ibm.com/developerworks/cn/analytics/blog/ba-parquet-for-spark-sql/index.html