一。
从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。
SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。
下面我们就介绍如何使用SparkSession来创建DataFrame。
请进入Linux系统,打开“终端”,进入Shell命令提示符状态。
首先,请找到样例数据。 Spark已经为我们提供了几个样例数据,就保存在“/usr/local/spark/examples/src/main/resources/”这个目录下,这个目录下有两个样例数据people.json和people.txt。
people.json文件的内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
people.txt文件的内容如下:
Michael, 29
Andy, 30
Justin, 19
下面我们就介绍如何从people.json文件中读取数据并生成DataFrame并显示数据(从people.txt文件生成DataFrame需要后面将要介绍的另外一种方式)。
请使用如下命令打开pyspark:
cd /usr/local/spark
./bin/pyspark
进入到pyspark状态后执行下面命令:
>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
>>> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
现在,我们可以执行一些常用的DataFrame操作。
// 打印模式信息
>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
// 选择多列
>>> df.select(df.name,df.age + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
// 条件过滤
>>> df.filter(df.age > 20 ).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
// 分组聚合
>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
// 排序
>>> df.sort(df.age.desc()).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
//多列排序
>>> df.sort(df.age.desc(), df.name.asc()).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
//对列进行重命名
>>> df.select(df.name.alias("username"),df.age).show()
+--------+----+
|username| age|
+--------+----+
| Michael|null|
| Andy| 30|
| Justin| 19|
+--------+----+
二。由RDD转换到DataFrame。
Spark官网提供了两种方法来实现从RDD转换得到DataFrame,第一种方法是,利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。
1.利用反射机制推断RDD模式
>>> from pyspark.sql.types import Row
>>> def f(x):
... rel = {}
... rel['name'] = x[0]
... rel['age'] = x[1]
... return rel
...
>>> peopleDF = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()
>>> peopleDF.createOrReplaceTempView("people") //必须注册为临时表才能供下面的查询使用 >>> personsDF = spark.sql("select * from people")
>>> personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print) Name: 19,Age:Justin
Name: 29,Age:Michael
Name: 30,Age:Andy
2.使用编程方式定义RDD模式
>>> from pyspark.sql.types import Row
>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType //生成 RDD
>>> peopleRDD = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt") //定义一个模式字符串
>>> schemaString = "name age" //根据模式字符串生成模式
>>> fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
>>> schema = StructType(fields)
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段 >>> rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1])) >>> peopleDF = spark.createDataFrame(rowRDD, schema) //必须注册为临时表才能供下面查询使用
scala> peopleDF.createOrReplaceTempView("people") >>> results = spark.sql("SELECT * FROM people")
>>> results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print) name: Michael,age: 29
name: Andy,age: 30
name: Justin,age: 19
三。保存成文件
>>> peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json") >>> peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")
>>> peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json"
>>> peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")