图解Spark API

时间:2022-03-31 07:42:16

初识spark,需要对其API有熟悉的了解才能方便开发上层应用。本文用图形的方式直观表达相关API的工作特点,并提供了解新的API接口使用的方法。例子代码全部使用python实现。

1. 数据源准备

准备输入文件:

$ cat /tmp/in apple bag bag cat cat cat

启动pyspark:

$ ./spark/bin/pyspark

使用textFile创建RDD:

>>> txt = sc.textFile("file:///tmp/in", 2)

查看RDD分区与数据:

>>> txt.glom().collect() [[u‘apple‘, u‘bag bag‘], [u‘cat cat cat‘]] 2. transformation flatMap

处理RDD的每一行,,一对多映射。

代码示例:

>>> txt.flatMap(lambda line: line.split()).collect() [u‘apple‘, u‘bag‘, u‘bag‘, u‘cat‘, u‘cat‘, u‘cat‘]

示意图:

map

处理RDD的每一行,一对一映射。

代码示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).collect() [(u‘apple‘, 1), (u‘bag‘, 1), (u‘bag‘, 1), (u‘cat‘, 1), (u‘cat‘, 1), (u‘cat‘, 1)]

示意图:

filter

处理RDD的每一行,过滤掉不满足条件的行。

代码示例:

>>> txt.flatMap(lambda line: line.split()).filter(lambda word: word !=‘bag‘).collect() [u‘apple‘, u‘cat‘, u‘cat‘, u‘cat‘]

示意图:

mapPartitions

逐个处理每一个partition,使用迭代器it访问每个partition的行。

代码示例:

>>> txt.flatMap(lambda line: line.split()).mapPartitions(lambda it: [len(list(it))]).collect() [3, 3]

示意图:

mapPartitionsWithIndex

逐个处理每一个partition,使用迭代器it访问每个partition的行,index保存partition的索引,等价于mapPartitionsWithSplit(过期函数)。

代码示例:

>>> txt.flatMap(lambda line: line.split()).mapPartitionsWithIndex(lambda index, it: [index]).collect() [0, 1]

示意图:

sample

根据采样因子指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子。第一个参数表示是否放回抽样,第二个参数表示抽样比例,第三个参数表示随机数seed。

代码示例:

>>> txt.flatMap(lambda line: line.split()).sample(False, 0.5, 5).collect() [u‘bag‘, u‘bag‘, u‘cat‘, u‘cat‘]

示意图:

union

合并RDD,不去重。

代码示例:

>>> txt.union(txt).collect() [u‘apple‘, u‘bag bag‘, u‘cat cat cat‘, u‘apple‘, u‘bag bag‘, u‘cat cat cat‘]

示意图:

distinct

对RDD去重。

代码示例:

>>> txt.flatMap(lambda line: line.split()).distinct().collect() [u‘bag‘, u‘apple‘, u‘cat‘]

示意图:

groupByKey

在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集。

代码示例:

>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect() [(u‘bag‘, <pyspark.resultiterable.ResultIterable object at 0x128a150>), (u‘apple‘, <pyspark.resultiterable.ResultIterable object at 0x128a550>), (u‘cat‘, <pyspark.resultiterable.ResultIterable object at 0x13234d0>)] >>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect()[0][1].data [1, 1]

示意图:

reduceByKey

在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。