因为Spark是用Scala实现的,所以Spark天生支持Scala API。此外,还支持Java和Python API。 以Spark 1.3版本号的Python API为例。其模块层级关系例如以下图所看到的:
从上图可知,pyspark是Python API的顶层package,它包括了几个重要的subpackages。当中:
1) pyspark.SparkContext
它抽象了指向spark集群的一条连接,可用来创建RDD对象,,它是API的主入口。
通过它可在提交的应用代码中动态创建spark app的配置并作为conf參数传给pyspark.SparkContext实例的构造函数。
若未动态创建conf。则pyspark.SparkContext实例从conf/spark-defaults.conf读取默认的全局配置。
3) pyspark.RDD
RDDs can be stored in memory between queries without requiring replication. Instead, they rebuild lost data on failure using lineage: each RDD remembers how it was built from other datasets (by transformations like
map, join or groupBy) to rebuild itself.
RDD是Spark编程的核心抽象概念,它代表了一个抽象的弹性分布式数据集。Spark支持对RDD进行两类操作:transformations和actions,它们所包括的函数列表能够參考官方文档的""和""部分。
依据Spark Programming Guide文档""部分的说明。依据已经存在的数据集创建新数据集的操作被称作transformation。对数据集做计算并将结果返回driver
program的操作被称作action。
比如,map是依据传入的函数參数对已有RDD做处理。其执行结果得到一个新的RDD,所以它是一个transformation操作;而reduce则是依据传入的函数參数对已有RDD做计算,计算结果不再是个RDD。而是个详细的值(对reduce来说,计算结果是个详细的数字,而其他action(s)得到的可能是个list或其他数据结构),所以reduce是个action操作。
须要特别强调的是,Spark对全部的transformations操作都採用lazy evaluation的策略,也即spark在调度时并非对遇到的每一个transformation都马上求值以得到新的RDD,而是将针对某个RDD的一系列transformations操作记录下来,仅仅有终于遇到action操作时,Spark才会计算先前记录的每一个transformations。
这样的lazy evaluation的设计思路使得Spark得以更高效执行,由于调度器能够对从初始RDD到终于action操作路径上的transformations做合并或其他变换。且仅仅有终于的action操作结果才会返回给driver program,节省了transformations操作的中间结果在集群worker node和driver program间的传输开销。
默认情况下。调用action操作时,初始RDD经过的每一个transformation操作均会被运行一次,在多个actions会经过一系列同样的transformations操作时。这样的recompute显得并不高效。因此。在实际开发Spark计算任务脚本时。会被多个actions共用的transformations结果最好调用persist或cache缓存起来。这样会节省不少计算时间。
通过Broadcast广播的变量的作用域相应用所申请的每一个节点上的executor进程都是可见的,并且广播后。变量会一直存在于每一个worker节点的executor进程中。直到任务结束。这样能够避免RDD数据集在driver和worker节点的executor进程间频繁传输带来的开销。
尤其是对于某些用到仅仅读共享变量的应用(如须要载入字典且全部计算节点均需訪问该字典),广播能够高效地实现变量共享的目的。
5) pyspark.Accumulator
它是Spark支持的还有一种变量共享的方式(第1种方式是上面介绍的Broadcast),worker节点上的进程能够通过add()操作更新变量,更新后的变量会自己主动传播回driver program。
当应用通过SparkContext.addFile()向集群提交任务用到的文件时,调用SparkFiles类的相关方法能够解析这些文件路径并訪问文件。
7) pyspark.StorageLevel
它能够指定RDD的存储级别,如仅仅使用内存、仅仅使用磁盘、内存为主磁盘为辅,等等。具体的控制标识能够參考的文档。
1. Spark Programming Guide - RDD Operations
2. pyspark package
3. Spark Programming Guide: RDD Transformations
4. Spark Programming Guide: RDD Actions
5. pyspark package: pyspark.StorageLevel
======================= EOF ====================