Spark编程之基本的RDD算子sparkContext,foreach,foreachPartition, collectAsMap
- 1) context, sparkContext
返回一个RDD的sparkContext,当我们需要用到一个sc的时候,可以通过rdd.sparkContext来获取这个rdd的sc。
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.context //通过调用context方法可以获得创建这个rdd的sparkcontext。
res8: org.apache.spark.SparkContext = org.apache.spark.SparkContext@58c1c2f1
- 2) foreach
执行一个不返回值的函数操作,传入的参数是一个函数,返回值为unit。
def foreach(f: T => Unit)
val c = sc.parallelize(List("cat", "dog", "tiger",
"lion", "gnu", "crocodile", "ant", "whale",
"dolphin", "spider"), 3)
c.foreach(x => println(x + "s are yummy"))
lions are yummy
gnus are yummy
crocodiles are yummy
ants are yummy
whales are yummy
dolphins are yummy
spiders are yummy
- 3) foreachPartition
穿入的值为一个函数,对每一个分区执行一个操作,但是不返回值。这个函数的接收参数为Iterator类型。
def foreachPartition(f: Iterator[T] => Unit)
val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
b.foreachPartition(x => println(x.reduce(_ + _)))
// 注意这个x 其实是一个iterator类型。
6
15
24
- 4) lookup
lookup作用于键值对类型的数据,会去查找RDD中的key值为特定值的数据,然后将该数据封装在Seq类型中返回。
def lookup(key: K): Seq[V]
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.lookup(5) //在这里长度为5的只有 tiger和eagle类型,所以返回的结果如下:
res0: Seq[String] = WrappedArray(tiger, eagle)
- 5) collect, toArray
这个算子是一个action类型,也就意味着spark执行到这个算子的时候会启动一个job来进行执行。
会将一个RDD转换成为一个数组返回。
def collect(): Array[T]
def toArray(): Array[T]
值得注意的是,collect有一个重载,传入的是一个偏函数。
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]
如果传入的是一个偏函数的话,比如f = T -> U, 他会将这个值转换为U类型执行好之后,然后输出。
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
- 6) collectAsMap [Pair]
这个类似于collect,不过它作用于key-value类型的RDD,然后将其以一个Map类型的数据返回。
def collectAsMap(): Map[K, V]
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.zip(a)
b.collectAsMap //返回值为Map类型
res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)