初识spark,需要对其API有熟悉的了解才能方便开发上层应用。本文用图形的方式直观表达相关API的工作特点,并提供了解新的API接口使用的方法。例子代码全部使用python实现。
1. 数据源准备准备输入文件:
$ cat /tmp/in apple bag bag cat cat cat启动pyspark:
$ ./spark/bin/pyspark使用textFile创建RDD:
>>> txt = sc.textFile("file:///tmp/in", 2)查看RDD分区与数据:
>>> txt.glom().collect() [[u‘apple‘, u‘bag bag‘], [u‘cat cat cat‘]] 2. transformation flatMap处理RDD的每一行,,一对多映射。
代码示例:
>>> txt.flatMap(lambda line: line.split()).collect() [u‘apple‘, u‘bag‘, u‘bag‘, u‘cat‘, u‘cat‘, u‘cat‘]示意图:
map处理RDD的每一行,一对一映射。
代码示例:
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).collect() [(u‘apple‘, 1), (u‘bag‘, 1), (u‘bag‘, 1), (u‘cat‘, 1), (u‘cat‘, 1), (u‘cat‘, 1)]示意图:
filter处理RDD的每一行,过滤掉不满足条件的行。
代码示例:
>>> txt.flatMap(lambda line: line.split()).filter(lambda word: word !=‘bag‘).collect() [u‘apple‘, u‘cat‘, u‘cat‘, u‘cat‘]示意图:
mapPartitions逐个处理每一个partition,使用迭代器it访问每个partition的行。
代码示例:
>>> txt.flatMap(lambda line: line.split()).mapPartitions(lambda it: [len(list(it))]).collect() [3, 3]示意图:
mapPartitionsWithIndex逐个处理每一个partition,使用迭代器it访问每个partition的行,index保存partition的索引,等价于mapPartitionsWithSplit(过期函数)。
代码示例:
>>> txt.flatMap(lambda line: line.split()).mapPartitionsWithIndex(lambda index, it: [index]).collect() [0, 1]示意图:
sample根据采样因子指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子。第一个参数表示是否放回抽样,第二个参数表示抽样比例,第三个参数表示随机数seed。
代码示例:
>>> txt.flatMap(lambda line: line.split()).sample(False, 0.5, 5).collect() [u‘bag‘, u‘bag‘, u‘cat‘, u‘cat‘]示意图:
union合并RDD,不去重。
代码示例:
>>> txt.union(txt).collect() [u‘apple‘, u‘bag bag‘, u‘cat cat cat‘, u‘apple‘, u‘bag bag‘, u‘cat cat cat‘]示意图:
distinct对RDD去重。
代码示例:
>>> txt.flatMap(lambda line: line.split()).distinct().collect() [u‘bag‘, u‘apple‘, u‘cat‘]示意图:
groupByKey在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集。
代码示例:
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect() [(u‘bag‘, <pyspark.resultiterable.ResultIterable object at 0x128a150>), (u‘apple‘, <pyspark.resultiterable.ResultIterable object at 0x128a550>), (u‘cat‘, <pyspark.resultiterable.ResultIterable object at 0x13234d0>)] >>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect()[0][1].data [1, 1]示意图:
reduceByKey在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。