时间:2024-09-17 11:34:44

Working with key/value Pairs

Motivation

  • Pair RDDs are a useful building block in many programs, as they expose operations that allow u to act on each key in parallel or regroup data across network.
  • Eg: pair RDDs have a reduceByKey() method that can aggeragate data separately for each key; join() method that can merge two RDDs together by grouping elements with the same key.

Creating Pair RDDs

  • Many formats we loading from will directly return pair RDDs for their k/v values.(许多格式的数据加载会直接返回pair RDDs)
  • By turning a regular RDD into a pair RDD  --> Using map() function  (通过转换得到)
val pairs = lines.map(x => (x.split(" ")(0), x))

Transformation on Pair RDDs

  • 我们同样可以给Spark传送函数,不过由于pair RDDs包含的是元组tuple,所以我们要传送的函数式操作在tuples之上的。实际上Pair RDDs就是RDDs of Tuple2 object。

Aggregations

  • reduceByKey()和reduce()很相似:它们都接收一个函数并使用该函数来combine values。它们的不同在于:
    1. reduceByKey()并行地为数据集中每个key运行reduce操作。
    2. reduceByKey()属于transformation,它返回一个新的RDD。这样做是考虑到数据集有大量的keys。
  • foldByKey()与fold()相似:都使用与RDD中数据同类型的zero value,以及一个combination函数。
    1. foldByKey()所提供的zero会应用于每个value
  • 与MR的combiner概念类似:调用reduceByKey()和foldByKey()时会自动在每个机器本地进行combine。用户不需指定combiner。更通用的combineByKey()接口可以允许你自定义combiner。
val input = sc.textFile("input/")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x = > (x, 1)).reduceByKey((x, y) => x + y)
  • combineByKey()是最general的per-key aggregation functions. combinerByKey():对一个partition中的每个元素,如果该元素的key原来没出现过,combineByKey()使用我们提供的函数createCombiner()来为该key创建一个accumulator初始值(要注意这里如果是一个已经出现过的key,那么不会执行此操作)。如果该key在处理该partition时已经出现过,那么将会使用给的的mergeValue()函数,操作在当前value和该key的accumulator之上。这样,每个partition被独立处理之后,对同一个key我们得到多个accumulators。当我们merging每个partition的结果时,会对相同key使用用户提供的mergeCombiners()函数。没看懂的话仔细看下面这段求平均值的例子!~
val result = input.combineByKey(
(v) => (v, 1), // createCombiner(), value => (value, num)
(acc:(Int, Int), v) => (acc._1 +v, acc._2 + 1), // mergeValue(), for each key: ((valueSum, nums), value) => (valueSum +value, nums + 1)
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // mergeCombiners(), for each key: (valueSumOnOnePartition, nums), (valueSumOnAnotherPartition, nums) => (_+_, _+_)
).map{case (key, value) => (key, value._1 / value._2.toFloat)} // (key,(valueSum, nums)) => (key, valueSum / float(nums))
// we can use mapValues(v => (v._1 / v._2.toFloat))
result.collectAsMap().map(println(_))
  • 很多options可以根据key来combine数据,他们基本上都是基于combineByKey()来实现的,只是在上层提供了更简单的接口。
  • 如果我们知道数据并不会从中获益时, 我们可以disable map-side aggregation in combineByKey()。比如groupByKey()的map端聚合被disable,因为appending to list不会save any space.

Tuning the level of parallelism

  • spark如何决定如何划分作业?
  • 每个RDD都有一个固定的partitions数目,它决定了执行RDD操作时的并行度。Spark会试图根据你的cluster size推断一个实用的缺省值。
  • 当执行聚合aggregation或分组grouping操作时,我们可以要求Spark使用指定的partitions数量。
data = [("a", 3), ("b", 4), ("a", 1)]
sc.parallelize(data).reduceByKey((x, y) => x + y) // default parallelism
sc.parallelize(data).reduceByKey((x, y) => x + y, 10) // custom parallelism

Grouping Data

  • 带有key的数据最常用的用例就是根据key划分数据
  • groupByKey(): [k, V] -> [K, Iterable[V]]
  • groupBy(func): works on unpaired data,会根据传入的func计算key,并进行分组。
val a = sc.parallelize(1 to 9, 3)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect//分成两组
/*结果
Array(
(even,ArrayBuffer(2, 4, 6, 8)),
(odd,ArrayBuffer(1, 3, 5, 7, 9))
)
*/
  • 如果你发现你在使用groupByKey()之后在values上使用reduce()或fold()。那么你可以更高效地用per-key聚合函数来替换它。比如说rdd.reduceByKey(func) 和rdd.groupBy Key().mapValues(value => value.reduce(func)) 产生的是同样的RDD,但是前者更高效,因为前者避免了对每个key创建a list of values的过程。
  • 除了从一个单一的RDD聚合数据,我们还可以从多个sharing 相同key的RDDs中聚合数据 --> 使用cogroup()函数。
    • cogroup(): RDD[(K, V)], RDD[(K, W)] --> RDD[(K, (Iterable[V], Iterable[W]))]
    • cogroup()是join、intersect等的底层实现

Joins

  • 包含了左/右外连接,交叉连接和内连接。

Sorting Data

  • 只要key上定义了排序准则,我们就可以对键值对进行排序
  • sortByKey(): 可以提供自定义的comparison函数
val input: RDD[(Int, Venue)] =
implicit val sortIntergersByString = new Ordering[Int]{
override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()

Actions Available on Pair RDDs

  • 一些附加的pair RDDs可用的actions如下,它们可以利用键值对的天然优势
    • countByKey(): 为每个键值的元素统计数量
    • collectAsMap(): 将结果收集成map结构的,以便提供方便的查找功能
    • lookup(key): 返回key关联的value。eg: rdd.lookup(3)返回[4, 6]
    • ...

Data Partitioning(Advanced)

  • 这一部分我们会了解到如何控制数据集在节点之间的partitioning。Spark的分区在所有的键值对RDD上可用。
  • 在分布式程序中,通信是十分昂贵的。因此合理的安放数据来减少网络流量可以大幅度提高性能。
  • 并不是在所有的应用中,partitioning都是很有帮助的。比如说一个给定的RDD只扫描一次,那么就没有必要预先为他划分分区。
  • 当一个数据多次在面向key的操作中被重用时,预先partitioning是十分有用的。
  • eg:在这个例子中我们有一个userData(UserID, LinkInfo)表(不更新),和一个events(UserID, LinkInfo)表(每n分钟实时更新)。在这个例子中我们可以先用partitionBy()对userData进行分区,这样在之后join的时候Spark知道该数据已经是hash-partitioned的,就不会在shuffle该数据了。而events表是实时更新,并且只使用一次,所以没有必要预先partition。
// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.
// This distributes elements of userData by the HDFS block where they are found,
// and doesn't provide Spark with any way of knowing in which partition a
// particular UserID is located.
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // Create 100 partitions .persist() // we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components !userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
  • 关于partitionBy()
    • 要注意的是partitionBy()属于transformation,它返回的是一个新的RDD,所以我们需要persist()并保存为userData。注意一定要persist().
    • partitionBy()中的100表示分区数量,它会控制further opertions on the RDD的并行任务数。通常它被设置成你的cluster的cores数目。
  • 如果预先知道partitioning的信息,很多操作都会从中获益。比如sortByKey()和groupByKey()分别会受益于range-partitioned和hash-partitioned RDD.
  • 但是,一些操作比如map()会导致新的RDD忘记parent的分区信息,因为那些操作理论上会改变每个记录的key。(但是像mapValues()这样的就会保存父RDD的partition信息)。

Determining an RDD's Partitioner

  • 在Scala和Java中,我们可以通过partitioner property来确定它是如何partitioned的。
  • 对于Scala,看下面这个例子:
  • scala> data1.partitioner
    res2: Option[org.apache.spark.Partitioner] = None scala> data1.partitionBy(new HashPartitioner(2))
    res3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:30 scala> data1.partitioner
    res4: Option[org.apache.spark.Partitioner] = None scala> val data2 = data1.partitionBy(new HashPartitioner(2))
    data2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at <console>:29 scala> data2.partitioner
    res5: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
    • RDD.partitioner:返回的是Option类。关于Option:Option是Scala提供的一个容器,其中可能包含也可能不包含item。我们可以在Option上调用isDefined() 来判断其是否含有值,并且使用get()来获取值。
    • @transient val partitioner: Option[Partitioner] = None
    • 其次就是要记得RDD是不可变的- -,要新建一个变量data2来保存partition后的数据。如果我们后续还想使用data2的话,要调用persist()。

Operations that Benefit from Partitioning

  • 许多Spark操作包含了在网络间shuffle数据的过程,这些操作都会从partitioning中受益。
  • cogroup(), groupWith(), join(), groupByKey(), reduceByKey(), combineByKey(), lookup()...
  • 比如reduceByKey()如果操作在已经预分区的RDD上,那么每个key的values都可以本地计算了。
  • 对cogroup()和join()操作,预分区会使至少一个RDDs不需要被shuffle。如果两个RDDs都使用同样的partitioner,并且都cache在相同的机器上(比如其中一个是通过另一个使用mapValues()得到的),那么将不会有shuffle发生。

Operations that Affect Partitioning

  • Spark知道操作是如何影响partitioning的,因此它会自动的为由某RDD转化而得的RDD设置partitioner
    • 比如:你join()两个RDDs,那么相同key的元素会被hash到同一个machine,因此Spark知道结果是hash-partitioned。那么对join所得的RDD进行reduceByKey()会很快。
  • 但是一些transformation不能保证得到已知的partitioning,那么输出的RDD将不会有partitioner set。
    • 比如在hash-partitioned RDD上调用map()所得RDD将不会有partitioner。因为理论上map()会改变每个元素的key,而Spark不会分析你的函数来检查你是否保持key。因此,相应的,你可以使用mapValues()和flatMapValues()来保证每个元组的key保持不变。所以如果你没有改变元素的key,最好调用mapValues()和flatMapValues()。
  • 保证result RDD 已经partitioned的操作有:cogroup(), groupWith(), join(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(), mapValues()[如果其父RDD已经partitioned], flatMapValues()[如果其父RDD已经partitioned], filter()[如果其父RDD已经partitioned]。 而其他的操作会得到一个没有partitioner的RDD。

Example: PageRank

  • PageRank是一个能从RDD partitioning受益的例子。
  • PageRank是一个多次join的迭代算法。算法有两个数据集:(pageID, linkList) elements, (pageID, rank) elements
  • // Assume that our neighbor list was saved as a Spark objectFile
    val links = sc.objectFile[(String, Seq[String])]("links") .partitionBy(new HashPartitioner(100))
    .persist()
    // Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD // will have the same partitioner as links
    var ranks = links.mapValues(v => 1.0)
    // Run 10 iterations of PageRank
    for(i<-0until10){
    val contributions = links.join(ranks).flatMap {
    case (pageId, (links, rank)) => links.map(dest => (dest, rank / links.size))
    }
    ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v) }
    // Write out the final ranks
    ranks.saveAsTextFile("ranks")
  • 该程序有几个trick:
    • 对links进行pre-partition并persist,这样links再也不用shuffle。
    • 初始化ranks时使用mapValues()保留links的分区,这样第一次join的时候完全不用shuffle。
    • 对ranks进行reduceByKey()所得结果是hash-partitioned的,所以我们调研mapValues(),那么所得的ranks仍然是hash-partitioned的。

Custom Partitioners

  • Spark提供了HashPartitioner和RangePartitioner来满足很多情况,但是你仍然可以自定义Partitioner。你可以使用领域知识来减少通信。
  • 比如,我们之前的PageRank算法中每个pageID使用URL表示的,那么相似的URLs(e.g., http://www.cnn.com/WORLD and http://www.cnn.com/US)可能被hash到完全不同的节点。但是有相同域名的URL可能更容易指向彼此。同时PageRank算法每次迭代都会从每个page发送信息到该page的neighbors。因此我们可以自定义一个partitioner,该partitioner只look at域名而不是整个URL。
  • 为实现自定义partitioner,你需要继承org.apache.spark.Partitioner,并且实现以下三个方法:
    • numPartitions:Int, 返回你将会创建的partitions的数目
    • getPartition(key: Any): Int, 返回给定key的partitionID
    • equals(), 标准Java equality方法。Spark通过该方法来比较两个Partitioner objects。
  • class DomainNamePartitioner(numParts: Int) extends Partitioner {
    override def numPartitions: Int = numParts
    override def getPartition(key: Any): Int = {
    val domain = new Java.net.URL(key.toString).getHost()
    val code = (domain.hashCode % numPartitions)
    if(code<0){
    code + numPartitions // Make it non-negative }
    else{
    code
    }
    }
    // Java equals method to let Spark compare our Partitioner objects
    override def equals(other: Any): Boolean = other match {
    case dnp: DomainNamePartitioner =>
    dnp.numPartitions == numPartitions
    case _ =>
    false
    } }
  • 使用很简单,直接将custom Partitioner类传到partitionBy()方法即可。