Working with key/value Pairs
Motivation
- Pair RDDs are a useful building block in many programs, as they expose operations that allow u to act on each key in parallel or regroup data across network.
- Eg: pair RDDs have a reduceByKey() method that can aggeragate data separately for each key; join() method that can merge two RDDs together by grouping elements with the same key.
Creating Pair RDDs
- Many formats we loading from will directly return pair RDDs for their k/v values.(许多格式的数据加载会直接返回pair RDDs)
- By turning a regular RDD into a pair RDD --> Using map() function (通过转换得到)
val pairs = lines.map(x => (x.split(" ")(0), x))
Transformation on Pair RDDs
- 我们同样可以给Spark传送函数,不过由于pair RDDs包含的是元组tuple,所以我们要传送的函数式操作在tuples之上的。实际上Pair RDDs就是RDDs of Tuple2 object。
Aggregations
- reduceByKey()和reduce()很相似:它们都接收一个函数并使用该函数来combine values。它们的不同在于:
- reduceByKey()并行地为数据集中每个key运行reduce操作。
- reduceByKey()属于transformation,它返回一个新的RDD。这样做是考虑到数据集有大量的keys。
- foldByKey()与fold()相似:都使用与RDD中数据同类型的zero value,以及一个combination函数。
- foldByKey()所提供的zero会应用于每个value
- 与MR的combiner概念类似:调用reduceByKey()和foldByKey()时会自动在每个机器本地进行combine。用户不需指定combiner。更通用的combineByKey()接口可以允许你自定义combiner。
val input = sc.textFile("input/")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x = > (x, 1)).reduceByKey((x, y) => x + y)
- combineByKey()是最general的per-key aggregation functions. combinerByKey():对一个partition中的每个元素,如果该元素的key原来没出现过,combineByKey()使用我们提供的函数createCombiner()来为该key创建一个accumulator初始值(要注意这里如果是一个已经出现过的key,那么不会执行此操作)。如果该key在处理该partition时已经出现过,那么将会使用给的的mergeValue()函数,操作在当前value和该key的accumulator之上。这样,每个partition被独立处理之后,对同一个key我们得到多个accumulators。当我们merging每个partition的结果时,会对相同key使用用户提供的mergeCombiners()函数。没看懂的话仔细看下面这段求平均值的例子!~
val result = input.combineByKey(
(v) => (v, 1), // createCombiner(), value => (value, num)
(acc:(Int, Int), v) => (acc._1 +v, acc._2 + 1), // mergeValue(), for each key: ((valueSum, nums), value) => (valueSum +value, nums + 1)
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // mergeCombiners(), for each key: (valueSumOnOnePartition, nums), (valueSumOnAnotherPartition, nums) => (_+_, _+_)
).map{case (key, value) => (key, value._1 / value._2.toFloat)} // (key,(valueSum, nums)) => (key, valueSum / float(nums))
// we can use mapValues(v => (v._1 / v._2.toFloat))
result.collectAsMap().map(println(_))
- 很多options可以根据key来combine数据,他们基本上都是基于combineByKey()来实现的,只是在上层提供了更简单的接口。
- 如果我们知道数据并不会从中获益时, 我们可以disable map-side aggregation in combineByKey()。比如groupByKey()的map端聚合被disable,因为appending to list不会save any space.
Tuning the level of parallelism
- spark如何决定如何划分作业?
- 每个RDD都有一个固定的partitions数目,它决定了执行RDD操作时的并行度。Spark会试图根据你的cluster size推断一个实用的缺省值。
- 当执行聚合aggregation或分组grouping操作时,我们可以要求Spark使用指定的partitions数量。
data = [("a", 3), ("b", 4), ("a", 1)]
sc.parallelize(data).reduceByKey((x, y) => x + y) // default parallelism
sc.parallelize(data).reduceByKey((x, y) => x + y, 10) // custom parallelism
Grouping Data
- 带有key的数据最常用的用例就是根据key划分数据
- groupByKey(): [k, V] -> [K, Iterable[V]]
- groupBy(func): works on unpaired data,会根据传入的func计算key,并进行分组。
val a = sc.parallelize(1 to 9, 3)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect//分成两组
/*结果
Array(
(even,ArrayBuffer(2, 4, 6, 8)),
(odd,ArrayBuffer(1, 3, 5, 7, 9))
)
*/
- 如果你发现你在使用groupByKey()之后在values上使用reduce()或fold()。那么你可以更高效地用per-key聚合函数来替换它。比如说rdd.reduceByKey(func) 和rdd.groupBy Key().mapValues(value => value.reduce(func)) 产生的是同样的RDD,但是前者更高效,因为前者避免了对每个key创建a list of values的过程。
- 除了从一个单一的RDD聚合数据,我们还可以从多个sharing 相同key的RDDs中聚合数据 --> 使用cogroup()函数。
- cogroup(): RDD[(K, V)], RDD[(K, W)] --> RDD[(K, (Iterable[V], Iterable[W]))]
- cogroup()是join、intersect等的底层实现
Joins
Sorting Data
- 只要key上定义了排序准则,我们就可以对键值对进行排序
- sortByKey(): 可以提供自定义的comparison函数
val input: RDD[(Int, Venue)] =
implicit val sortIntergersByString = new Ordering[Int]{
override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()
Actions Available on Pair RDDs
- 一些附加的pair RDDs可用的actions如下,它们可以利用键值对的天然优势
- countByKey(): 为每个键值的元素统计数量
- collectAsMap(): 将结果收集成map结构的,以便提供方便的查找功能
- lookup(key): 返回key关联的value。eg: rdd.lookup(3)返回[4, 6]
- ...
Data Partitioning(Advanced)
- 这一部分我们会了解到如何控制数据集在节点之间的partitioning。Spark的分区在所有的键值对RDD上可用。
- 在分布式程序中,通信是十分昂贵的。因此合理的安放数据来减少网络流量可以大幅度提高性能。
- 并不是在所有的应用中,partitioning都是很有帮助的。比如说一个给定的RDD只扫描一次,那么就没有必要预先为他划分分区。
- 当一个数据多次在面向key的操作中被重用时,预先partitioning是十分有用的。
- eg:在这个例子中我们有一个userData(UserID, LinkInfo)表(不更新),和一个events(UserID, LinkInfo)表(每n分钟实时更新)。在这个例子中我们可以先用partitionBy()对userData进行分区,这样在之后join的时候Spark知道该数据已经是hash-partitioned的,就不会在shuffle该数据了。而events表是实时更新,并且只使用一次,所以没有必要预先partition。
// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.
// This distributes elements of userData by the HDFS block where they are found,
// and doesn't provide Spark with any way of knowing in which partition a
// particular UserID is located.
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // Create 100 partitions .persist()
// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components !userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
- 关于partitionBy()
- 要注意的是partitionBy()属于transformation,它返回的是一个新的RDD,所以我们需要persist()并保存为userData。注意一定要persist().
- partitionBy()中的100表示分区数量,它会控制further opertions on the RDD的并行任务数。通常它被设置成你的cluster的cores数目。
- 如果预先知道partitioning的信息,很多操作都会从中获益。比如sortByKey()和groupByKey()分别会受益于range-partitioned和hash-partitioned RDD.
- 但是,一些操作比如map()会导致新的RDD忘记parent的分区信息,因为那些操作理论上会改变每个记录的key。(但是像mapValues()这样的就会保存父RDD的partition信息)。
Determining an RDD's Partitioner
- 在Scala和Java中,我们可以通过partitioner property来确定它是如何partitioned的。
- 对于Scala,看下面这个例子:
-
scala> data1.partitioner
res2: Option[org.apache.spark.Partitioner] = None
scala> data1.partitionBy(new HashPartitioner(2))
res3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:30
scala> data1.partitioner
res4: Option[org.apache.spark.Partitioner] = None
scala> val data2 = data1.partitionBy(new HashPartitioner(2))
data2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at <console>:29
scala> data2.partitioner
res5: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
- RDD.partitioner:返回的是Option类。关于Option:Option是Scala提供的一个容器,其中可能包含也可能不包含item。我们可以在Option上调用isDefined() 来判断其是否含有值,并且使用get()来获取值。
@transient val partitioner: Option[Partitioner] = None
- 其次就是要记得RDD是不可变的- -,要新建一个变量data2来保存partition后的数据。如果我们后续还想使用data2的话,要调用persist()。
Operations that Benefit from Partitioning
- 许多Spark操作包含了在网络间shuffle数据的过程,这些操作都会从partitioning中受益。
- cogroup(), groupWith(), join(), groupByKey(), reduceByKey(), combineByKey(), lookup()...
- 比如reduceByKey()如果操作在已经预分区的RDD上,那么每个key的values都可以本地计算了。
- 对cogroup()和join()操作,预分区会使至少一个RDDs不需要被shuffle。如果两个RDDs都使用同样的partitioner,并且都cache在相同的机器上(比如其中一个是通过另一个使用mapValues()得到的),那么将不会有shuffle发生。
Operations that Affect Partitioning
- Spark知道操作是如何影响partitioning的,因此它会自动的为由某RDD转化而得的RDD设置partitioner
- 比如:你join()两个RDDs,那么相同key的元素会被hash到同一个machine,因此Spark知道结果是hash-partitioned。那么对join所得的RDD进行reduceByKey()会很快。
- 但是一些transformation不能保证得到已知的partitioning,那么输出的RDD将不会有partitioner set。
- 比如在hash-partitioned RDD上调用map()所得RDD将不会有partitioner。因为理论上map()会改变每个元素的key,而Spark不会分析你的函数来检查你是否保持key。因此,相应的,你可以使用mapValues()和flatMapValues()来保证每个元组的key保持不变。所以如果你没有改变元素的key,最好调用mapValues()和flatMapValues()。
- 保证result RDD 已经partitioned的操作有:cogroup(), groupWith(), join(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(), mapValues()[如果其父RDD已经partitioned], flatMapValues()[如果其父RDD已经partitioned], filter()[如果其父RDD已经partitioned]。 而其他的操作会得到一个没有partitioner的RDD。
Example: PageRank
- PageRank是一个能从RDD partitioning受益的例子。
- PageRank是一个多次join的迭代算法。算法有两个数据集:(pageID, linkList) elements, (pageID, rank) elements
// Assume that our neighbor list was saved as a Spark objectFile
val links = sc.objectFile[(String, Seq[String])]("links") .partitionBy(new HashPartitioner(100))
.persist()
// Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD // will have the same partitioner as links
var ranks = links.mapValues(v => 1.0)
// Run 10 iterations of PageRank
for(i<-0until10){
val contributions = links.join(ranks).flatMap {
case (pageId, (links, rank)) => links.map(dest => (dest, rank / links.size))
}
ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v) }
// Write out the final ranks
ranks.saveAsTextFile("ranks")
- 该程序有几个trick:
- 对links进行pre-partition并persist,这样links再也不用shuffle。
- 初始化ranks时使用mapValues()保留links的分区,这样第一次join的时候完全不用shuffle。
- 对ranks进行reduceByKey()所得结果是hash-partitioned的,所以我们调研mapValues(),那么所得的ranks仍然是hash-partitioned的。
Custom Partitioners
- Spark提供了HashPartitioner和RangePartitioner来满足很多情况,但是你仍然可以自定义Partitioner。你可以使用领域知识来减少通信。
- 比如,我们之前的PageRank算法中每个pageID使用URL表示的,那么相似的URLs(e.g., http://www.cnn.com/WORLD and http://www.cnn.com/US)可能被hash到完全不同的节点。但是有相同域名的URL可能更容易指向彼此。同时PageRank算法每次迭代都会从每个page发送信息到该page的neighbors。因此我们可以自定义一个partitioner,该partitioner只look at域名而不是整个URL。
- 为实现自定义partitioner,你需要继承org.apache.spark.Partitioner,并且实现以下三个方法:
- numPartitions:Int, 返回你将会创建的partitions的数目
- getPartition(key: Any): Int, 返回给定key的partitionID
- equals(), 标准Java equality方法。Spark通过该方法来比较两个Partitioner objects。
class DomainNamePartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
val domain = new Java.net.URL(key.toString).getHost()
val code = (domain.hashCode % numPartitions)
if(code<0){
code + numPartitions // Make it non-negative }
else{
code
}
}
// Java equals method to let Spark compare our Partitioner objects
override def equals(other: Any): Boolean = other match {
case dnp: DomainNamePartitioner =>
dnp.numPartitions == numPartitions
case _ =>
false
} }
- 使用很简单,直接将custom Partitioner类传到partitionBy()方法即可。