排序,真的非常重要!
RDD.scala(源码)
在其,没有罗列排序,不是说它不重要!
本博文的主要内容有:
1、基础排序算法实战
2、二次排序算法实战
3、更高级别排序算法
4、排序算法内幕解密
1、基础排序算法实战
启动hdfs集群
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh
启动spark集群
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh
启动spark-shell
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g
scala> sc.setLogLevel("WARN") //过滤日志提醒 scala> sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).collect
scala> sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).collect
res2: Array[(String, Int)] = Array(("",67), (the,21), (Spark,14), (to,14), (for,12), (a,10), (and,10), (##,8), (run,7), (is,6), (on,6), (can,6), (of,5), (also,5), (in,5), (if,4), (or,4), (Hadoop,4), (with,4), (you,4), (build,3), (including,3), (Please,3), (use,3), (particular,3), (documentation,3), (example,3), (an,3), (You,3), (building,3), (that,3), (guidance,3), (For,2), (This,2), (Hive,2), (To,2), (SparkPi,2), (refer,2), (Interactive,2), (be,2), (./bin/run-example,2), (1000:,2), (tests,2), (examples,2), (at,2), (using,2), (Shell,2), (class,2), (`examples`,2), (set,2), (Hadoop,,2), (cluster,2), (supports,2), (Python,2), (general,2), (locally,2), (following,2), (which,2), (should,2), ([project,2), (do,2), (how,2), (It,2), (Scala,2), (detailed,2), (return,2), (one,2), (Python,,2), (SQL...
scala>
则,可看出,是sortByKey(false)是按key排序且降序.
sortByKey源码
/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).
*/
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
由此,可看出,一旦排序,则产生ShuffledRDD。
为什么我的是没显示出来?
rangPartition是怎么排序的呢?
好的,基础排序算法实战至此。
2、二次排序算法实战
所谓,二次排序,就是指排序的时候考虑两个维度。
如,在第一列,按照降序排,第一列的key相同,那么,再怎么排呢?则,考虑第二列,按照降序排。即,用到了二次排序。
准备
【数据文件Input】
2 3
4 1
3 2
4 3
8 7
2 1
【运行结果Output】
2 1
2 3
3 2
4 1
4 3
8 7
如果是去大公司的话,则要掌握,5个维度,甚至8个维度,而不是才2个维度而已。加油!zhouls。
这里,就用,Scala IDE for Eclipse,来写,
Scala IDE for Eclipse的下载、安装和WordCount的初步使用(本地模式和集群模式)
SecondarySortKey.java
package com.zhouls.spark.SparkApps.cores; import java.io.Serializable;
import scala.math.Ordered; public class SecondarySortKey implements Ordered<SecondarySortKey>,Serializable{
private int first;
private int second; @Override
public boolean $greater(SecondarySortKey arg0) {
// TODO Auto-generated method stub
return false;
} @Override
public boolean $greater$eq(SecondarySortKey arg0) {
// TODO Auto-generated method stub
return false;
} @Override
public boolean $less(SecondarySortKey arg0) {
// TODO Auto-generated method stub
return false;
} @Override
public boolean $less$eq(SecondarySortKey arg0) {
// TODO Auto-generated method stub
return false;
} @Override
public int compare(SecondarySortKey arg0) {
// TODO Auto-generated method stub
return 0;
} @Override
public int compareTo(SecondarySortKey arg0) {
// TODO Auto-generated method stub
return 0;
}
}
在这里,学下技巧。
然后,修改成我们自己想要的。
最终的SecondarySortKey.java如下:
package com.zhouls.spark.SparkApps.cores; import java.io.Serializable;
import scala.math.Ordered; public class SecondarySortKey implements Ordered<SecondarySortKey>,Serializable{
private int first;
private int second; //二次排序的公开构造器
public SecondarySortKey(int first,int second){
this.first=first;
this.second=second;
} public boolean $greater(SecondarySortKey other) {
if(this.first>other.getFirst()){
return true;
}else if(this.first==other.getFirst()&&this.second>other.getSecond()){
return true;
}
return false;
} public boolean $greater$eq(SecondarySortKey other) {
if(this.$greater(other)){
return true;
}else if(this.first==other.getFirst()&&this.second==other.getSecond()){
return true;
}
return false;
} public boolean $less(SecondarySortKey other) {
if(this.first<other.getFirst()){
return true;
}else if(this.first==other.getFirst()&&this.second<other.getSecond()){
return true;
}
return false;
} public boolean $less$eq(SecondarySortKey other) {
if(this.$less(other)){
return true;
}else if(this.first==other.getFirst()&&this.second==other.getSecond()){
return true;
}
return false;
} public int compare(SecondarySortKey other) {
if(this.first-other.getFirst() !=0){
return this.first-other.getFirst();
}else{
return this.second-other.getSecond();
}
} public int compareTo(SecondarySortKey other) {
if(this.first-other.getFirst() !=0){
return this.first-other.getFirst();
}else{
return this.second-other.getSecond();
}
} public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
} public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SecondarySortKey other = (SecondarySortKey) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
} public int getFirst() {
return first;
} public void setFirst(int first) {
this.first = first;
} public int getSecond() {
return second;
} public void setSecond(int second) {
this.second = second;
}
}
package com.zhouls.spark.SparkApps.cores; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /*
* 二次排序,具体的实现步骤:
* 第一步:安装Ordered和Serrializable接口实现自定义排序的key
* 第二步:将要进行二次排序的文件加载进来<key,value>类型的RDD
* 第三步:使用sortByKey基于自定义的Key进行二次排序
* 第四步:去除掉排序的Key,只保留排序的结果
*/
public class SecondarySortApp {
public static void main(String[] args) {
SparkConf conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local");
JavaSparkContext sc=new JavaSparkContext(conf);//其底层实际上就是Scala的SparkContext
JavaRDD<String> lines = sc.textFile("D://SoftWare//spark-1.5.2-bin-hadoop2.6//helloSpark.txt");
JavaPairRDD<SecondarySortKey, String> pairs = lines.mapToPair(new PairFunction<String, SecondarySortKey, String>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<SecondarySortKey, String> call(String line) throws Exception {
String[] splited = line.split(" ");
SecondarySortKey key =new SecondarySortKey(Integer.valueOf(splited[0]),Integer.valueOf(splited[1]));
return new Tuple2<SecondarySortKey,String>(key,line);
}
}); JavaPairRDD<SecondarySortKey, String> sorted = pairs.sortByKey(); //过滤掉排序后自定的Key,保留排序的结果
JavaRDD<String> SecondaySorted=sorted.map(new Function<Tuple2<SecondarySortKey,String>, String>() { private static final long serialVersionUID = 1L; @Override
public String call(Tuple2<SecondarySortKey, String> sortedContent) throws Exception { System.out.println("sortedContent._1 "+(sortedContent._1).toString());
System.out.println("sortedContent._2 "+sortedContent._2); return sortedContent._2;
}
});
SecondaySorted.foreach(new VoidFunction<String>() { @Override
public void call(String sorted) throws Exception {
System.out.println(sorted);
}
}); }
}
Scala
package com.zhouls.spark.cores /**
* Created by Administrator on 2016/9/30.
*/
class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {
def compare(that: SecondarySortKey): Int = {
if(this.first-that.first!=0){
return this.first-that.first
}else{
return this.second-that.second
}
}
}
package com.zhouls.spark.cores
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Administrator on 2016/9/30.
* 二次排序:具体的实现步骤:
* 第一步:按照Ordered和Serrializable接口实现自定义排序的Key
* 第二步:将要进行二次排序的文件加载进来< key,value> 类型的RDD
* 第三步:使用sortByKey基于自定义的Key进行二次排序
* 第四步:去除掉排序的Key,只保留排序的结果
*/
object SecondarySortApp {
def main (args: Array[String]) {
val conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local")//创建SparkConf,初始化程序的配置
val sc = new SparkContext(conf)//创建SparkContext,这是第一个RDD创建的唯一入口,也是Driver的灵魂,是通往集群的唯一通道 val lines = sc.textFile("D:\\SoftWare\\spark-1.5.2-bin-hadoop2.6\\helloSpark.txt")//读取文件
// val results = lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt , line.split(" ")(1).toInt), line)).sortByKey().map(pair => pair._2)
val pairWithSortKey = lines.map (line=> (
new SecondarySortKey(line.split(" ")(0).toInt ,line.split(" ")(1).toInt), line
))
val sorted = pairWithSortKey.sortByKey()
val sortedResult = sorted.map(sortedLine => sortedLine._2)
sortedResult.collect.foreach(println)
} }
作业:
1:Scala实现二次排序
SecondarySortKey.scala的完整代码:
class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable{
override def compare(that: SecondarySortKey): Int = {
if(this.first-that.first!=0){
return this.first-that.first
}else{
return this.second-that.second
}
}
} object SecondarySortKey extends scala.AnyRef with Serializable{
def apply(first:Int,second:Int): SecondarySortKey ={
new SecondarySortKey(first,second)
}
}
SecondarySortApp.scala的完整代码:
object SecondarySortApp {
def main (args: Array[String]) {
val conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local")
val sc=new SparkContext(conf)
val lines=sc.textFile("D:\\SoftWare\\spark-1.5.2-bin-hadoop2.6\\helloSpark.txt")
//val results=lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line)).sortByKey().map(pair=>pair._2)
val results=lines.map(line=>(SecondarySortKey.apply(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line)).sortByKey().map(pair=>pair._2)
results.collect.foreach(println)
}
}
2:RangePartitioner的源码阅读:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ package org.apache.spark import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{ClassTag, classTag}
import scala.util.hashing.byteswap32 import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{CollectionsUtils, Utils}
import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils} /**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
} object Partitioner {
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If any of the RDDs already has a partitioner, choose that one.
*
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
* spark.default.parallelism is set, then we'll use the value from SparkContext
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
*
* Unless spark.default.parallelism is set, the number of partitions will be the
* same as the number of partitions in the largest upstream RDD, as this should
* be least likely to cause out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}
} /**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`.
*
* Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
* so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
} override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
} override def hashCode: Int = numPartitions
} /**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*
* Note that the actual number of partitions created by the RangePartitioner might not be the same
* as the `partitions` parameter, in the case where the number of sampled records is less than
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
@transient partitions: Int,
@transient rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner { // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.") private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
}
}
} def numPartitions: Int = rangeBounds.length + 1 private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
} override def equals(other: Any): Boolean = other match {
case r: RangePartitioner[_, _] =>
r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
case _ =>
false
} override def hashCode(): Int = {
val prime = 31
var result = 1
var i = 0
while (i < rangeBounds.length) {
result = prime * result + rangeBounds(i).hashCode
i += 1
}
result = prime * result + ascending.hashCode
result
} @throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
case _ =>
out.writeBoolean(ascending)
out.writeObject(ordering)
out.writeObject(binarySearch) val ser = sfactory.newInstance()
Utils.serializeViaNestedStream(out, ser) { stream =>
stream.writeObject(scala.reflect.classTag[Array[K]])
stream.writeObject(rangeBounds)
}
}
} @throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
case _ =>
ascending = in.readBoolean()
ordering = in.readObject().asInstanceOf[Ordering[K]]
binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int] val ser = sfactory.newInstance()
Utils.deserializeViaNestedStream(in, ser) { ds =>
implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
rangeBounds = ds.readObject[Array[K]]()
}
}
}
} private[spark] object RangePartitioner { /**
* Sketches the input RDD via reservoir sampling on each partition.
*
* @param rdd the input RDD to sketch
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2.toLong).sum
(numItems, sketched)
} /**
* Determines the bounds for range partitioning from candidates with weights indicating how many
* items each represents. Usually this is 1 over the probability used to sample this candidate.
*
* @param candidates unordered candidates with weights
* @param partitions number of partitions
* @return selected bounds
*/
def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight > target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}
}
参考 :
http://blog.sina.com.cn/s/blog_4a7854d90102ws97.html
http://blog.csdn.net/duan_zhihua/article/details/50761582