spark 使用Python编写 - 王大拿

时间:2024-02-22 22:55:42

spark 使用Python编写

常见的方法

由于Scala才刚刚开始学习,还是对python更为熟悉,因此在这记录一下自己的学习过程,主要内容来自于spark的官方帮助文档,这一节的地址为:

http://spark.apache.org/docs/latest/quick-start.html

文章主要是翻译了文档的内容,但也在里边加入了一些自己在实际操作中遇到的问题及解决的方案,和一些补充的小知识,一起学习。

环境:Ubuntu 16.04 LTS,Spark 2.0.1, Hadoop 2.7.3, Python 3.5.2,

 

利用spark shell进行交互式分析

1. 基础

首先打开spark与python交互的API

$ cd /usr/local/spark
$ ./bin/pyspark

Spark最重要的一个概念就是RDD(Resilient Distributed Dataset),弹性分布式数据集。RDD可以利用Hadoop的InputFormats创建,或者从其他RDD转换。

这里,作为入门,我们利用spark安装后文件夹中自带的README.md(此文件位置为/usr/local/spark/README.md)文件作为例子,学习如何创建一个新的RDD。

创建新的RDD:

>>> textFile = sc.textFile(“README.md”)

 

RDD支持两种类型的操作,actions和transformations:

actions: 在数据集上运行计算后返回值

transformations: 转换, 从现有数据集创建一个新的数据集

 RDD可以有执行一系列的动作(actions),这些动作可以返回值(values),转换(transformations),或者指向新的RDD的指针。下边学习RDD的一些简单的动作:

>>> textFile.count()  # 计数,返回RDD中items的个数,这里就是README.md的总行# 数
99
>>> textFile.first()  # RDD中的第一个item,这里就是文件README.md的第一行
u\'# Apache Spark\'

注意:如果之前是从/usr/local/spark启动pyspark,然后读取README.md文件的,如果执行count语句,会出现以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/spark/README.md

这是因为在使用相对路径时,系统默认是从hdfs://localhost:9000/目录下读取README.md文件的,但是README.md文件并不在这一目录下,所以sc.textFile()必须使用绝对路径,此时代码修改为:

>>> textFile = sc.textFile(“file:///usr/local/spark/README.md”)
99

下边尝试使用一个转换(transformation)。例如,使用filter这一转换返回一个新的RDD,这些RDD中的items都含有“Spark”字符串。

>>> linesWithSpark = textFile.filter(lambda line: “Spark” in line)

我们还可以将actions和transformation链接起来:

>>> textFile.filter(lambda line: “Spark” in line).count()  # 有多好行含有“Spark”这一字符串
19

 

2. 更多的RDD操作

利用RDD的动作和转换能够完成很多复杂的计算。例如,我们希望找到含有最后单词的一句话:

>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a>b) else b)
22

这个语句中,map函数将len(line.split())这一语句在所有line上执行,返回每个line所含有的单词个数,也就是将line都map到一个整数值,然后创建一个新的RDD。然后调用reduce,找到最大值。map和reduce函数里的参数是python中的匿名函数(lambda),事实上,我们这里也可以传递python中更顶层的函数。比如,我们先定义一个比较大小的函数,这样我们的代码会更容易理解:

复制代码
>>> def max(a, b):
. . .     if a > b:
. . .         return a
. . .     else:
. . .         return b
. . .
>>> textFile.map(lambda line: len(line.split())).reduce(max)
22
复制代码

Hadoop掀起了MapReduce的热潮。在spark中,能够更加容易的实现MapReduce

>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

上述语句中,利用flatMap, map和reduceByKey三个转换,计算文件README.md中每个单词出现的个数,并返回一个新的RDD,每个item的格式为(string, int),即单词和对应的出现次数。其中,

flatMap(func):与map相似,但是每个输入的item能够被map到0个或者更多的输出items上,也就是说func的返回值应当是一个Seq,而不是一个单独的item,上述语句中,匿名函数返回的就是一句话中所含的每个单词

reduceByKey(func):可以作用于使用“键-值”(K, V)形式存储的数据集上并返回一组新的数据集(K, V),其中,每个键的值为聚合使用func操作的结果,这里相当于python中字典的含义。上述语句中,相当于当某个单词出现一次时,就在这个单词的出现次数上加1,每个单词就是一个Key,reducByKey中的匿名函数计算单词的出现次数。

 

要收集上述语句的计算结果,可以使用collect这一动作:

>>> wordCounts.collect()
[(u\'when\', 1), (u\'R,\', 1), (u\'including\', 3), (u\'computation\', 1), ...]

 

3. 缓存Caching

Spark也支持将数据集存入集群范围的内存缓存中。这对于需要进行重复访问的数据非常有用,比如我们需要在一个小的数据集中执行查询操作,或者需要执行一个迭代算法(例如PageRank)。下面,利用之前命令中得到的linesWithSpark数据集,演示缓存这一操作过程:

>>> linesWithSpark.cache()
PythonRDD[26] at RDD at PythonRDD.scala:48
>>> linesWithSpark.count()
19
>>> linesWithSpark.count()
19

利用Spark去缓存一个100行的文件可能并没什么意义。但是有趣的是,这一系列的操作可以用于非常大的数据集上,甚至含有成千上万的节点的数据集。

 

4. 自含式应用程序(self-contained applications)

假设我们希望利用Spark API写一个自含式应用程序,我们可以利用Scala,Java或者Python完成。

下边,简单介绍一下怎样利用Python API (PySpark)写一个应用程序,命名为SimpleApp.py.

 

在spark所在目录下输入:

./bin/spark-submit --master local[4] SimpleApp.py

输出为:

Lines with a: 61, Lines with b: 27

 

此外,Spark自带很多例子,可以在spark目录下输入下列指令查看:

复制代码
# For Scala and Java, use run-example:

./bin/run-example SparkPi

# For Python examples, use spark-submit directly:

./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:

./bin/spark-submit examples/src/main/r/dataframe.R
复制代码

 使用py文件来处理

一、创建RDD

在pyspark中创建RDD有两种方法,一种是并行化一个列表,或者直接读取文件

但是所有工作的前提是初始化你的SparkSession,

spark2将SparkConf、SparkConText、SQLContext和HiveContext和StreamingContext进行了组合

import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
spark=SparkSession.builder.appName("lz").getOrCreate()
sc = SparkContext.getOrCreate()
\'\'\'
SPARK1的写法如下
conf = SparkConf().setMaster("local[10]").setAppName("PySparkShell")
sc = SparkContext.getOrCreate()
sqlContest = SQLContext(sc)
\'\'\'

 


a,并行化一个集合

data = sc.parallelize(
[(\'Amber\', 22), (\'Alfred\', 23), (\'Skye\',4), (\'Albert\', 12), 
(\'Amber\', 9)])


b,读取文件

data_from_file = sc.\
textFile(
\'/Users/drabast/Documents/PySpark_Data/VS14MORT.txt.gz\', 

4)
RDD创建好之后,我如果想查看里面内容如何操作:

比如在之前序列化了一个data

如果是在python中那么我们直接输入data 那么这数据就呈现出来了,在pyspark中是什么样子?

 

如何展现?

1)小数据量的情况加,可以直接将分布式的RDD通过转换函数collect()转换成成一个数组

 data.collect()

 

2)如果是大数据量的rdd,那么我们可以用take( n)来选取n个数据查看

print(data.take(3))

 

二、RDD转换sparkDataFrame

我们知道python的DataFrame是一个数据框,其实spark的DataFrame也是这么一个概念

RDD是无schema的数据结构

DataFrame是有schema的数据结构

RDD想转换成DataFrame就是在RDD基础上加上schema,如果没有提前定义好schema的名称,转化过程中默认schema为:_1,_2,_3

RDD转换成DataFrame:

data1 = spark.createDataFrame(data)

data2 = sqlContest.createDataFrame(data)

同样 dataframe也可以转换成rdd: rdd.map(lambda x: -----)


---------------------
作者:仙人掌_lz
来源:CSDN
原文:https://blog.csdn.net/qq_36603091/article/details/79588072
版权声明:本文为博主原创文章,转载请附上博文链接!