一、map、flatMap、mapParations、mapPartitionsWithIndex
1.1 map
map十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。
(1) 使用Java进行编写
public static void map() { List<String> list = Arrays.asList("李光洙","刘在石","哈哈","宋智孝"); JavaRDD<String> rdd = jsc.parallelize(list); JavaRDD<String> map = rdd.map(new Function<String, String>() { private static final long serialVersionUID = 1L; @Override public String call(String name) throws Exception { return "hello,"+name; } }); map.foreach(new VoidFunction<String>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(String msg) throws Exception { System.out.println(msg); } }); }
(2) 使用scala进行编写
def map(): Unit = { val list = List("李光洙","刘在石","哈哈","宋智孝"); val rdd = sc.parallelize(list) val map = rdd.map(s => "hello," + s).foreach(println) }
(3)运行结果
(4) 总结
可以看出,对于map算子,源JavaRDD的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新RDD的元素顺序与源RDD是相同的。而由有序又引出接下来的flatMap。
1.2 flatMap
flatMap与map一样,是将RDD中的元素依次的传入call方法,他比map多的功能是能在任何一个传入call方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。
(1) 使用Java进行编写
public static void flatmap() { List<String> list = Arrays.asList("李光洙 刘在石","哈哈 宋智孝"); JavaRDD<String> rdd = jsc.parallelize(list); JavaRDD<String> map = rdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }).map(new Function<String, String>() { private static final long serialVersionUID = 1L; @Override public String call(String s) throws Exception { return "你好," + s; } }); map.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); }
(2) 使用scala进行编写
def flatmap(): Unit = { val list = List("李光洙 刘在石","哈哈 宋智孝"); val rdd = sc.parallelize(list) rdd.flatMap(_.split(" ")).map("你好,"+_).foreach(println) }
(3) 运行结果
(4) 总结
flatMap的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源RDD查漏补缺时。
map和flatMap都是依次进行参数传递的,但有时候需要RDD中的两个元素进行相应操作时(例如:算存款所得时,下一个月所得的利息是要原本金加上上一个月所得的本金的),这两个算子便无法达到目的了,这是便需要mapPartitions算子,他传参的方式是将整个RDD传入,然后将一个迭代器传出生成一个新的RDD,由于整个RDD都传入了,所以便能完成前面说的业务。
map是对RDD中元素逐一进行函数操作映射为另外一个RDD,而flatMap操作是将函数应用于RDD之中的每一个元素,将返回的迭代器的所有内容构成新的RDD。而flatMap操作是将函数应用于RDD中每一个元素,将返回的迭代器的所有内容构成RDD。
flatMap与map区别在于map为“映射”,而flatMap“先映射,后扁平化”,map对每一次(func)都产生一个元素,返回一个对象,而flatMap多一步就是将所有对象合并为一个对象。
1.3 mapPartitions
与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
(1) 使用Java进行编写
public static void mapPartitions() { JavaRDD<String> textFile = jsc.textFile("words",3); textFile.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(Iterator<String> is) throws Exception { System.out.println("创建数据库连接。。。。"); List<String> list = new ArrayList<String>(); while(is.hasNext()) { list.add(is.next()); System.out.println("模拟向数据库插入批量数据。。。"); } System.out.println("关闭数据库连接。。。"); return list; } }).collect(); }
(2) 使用scala进行编写
def mapPartitions: Unit = { val rdd1 = sc.textFile("words") val mapResult = rdd1.mapPartitions(iter =>{ println("打开数据库。。。") val list = List() while(iter.hasNext){ list.addString(new StringBuilder(iter.next())) println("插入数据库。。。") } println("关闭数据库。。。") list.iterator }, false) mapResult.foreach(println) }
(3) 运行结果
(4)总结
mapPartitions比较适合需要分批处理数据的情况,比如将数据插入某个表,每批数据只需要开启一次数据库连接,大大减少了连接开支。
1.4 mapPartitionsWithIndex
每次获取和处理的就是一个分区的数据,并且知道处理的分区的分区号是什么
(1)使用Java编写
public static void mapPartitionsWithIndex() { List<String> list = Arrays.asList("李光洙","刘在石","哈哈","宋智孝"); JavaRDD<String> rdd = jsc.parallelize(list,3); JavaRDD<String> rdd2 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception { List<String> list = new ArrayList<String>(); while(iter.hasNext()) { list.add(index+"_"+iter.next()); } return list.iterator(); } }, true); rdd2.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); }
(2)使用scala编写
def mapPartitionsWithIndex: Unit = { val list = List("李光洙","刘在石","哈哈","宋智孝") val rdd1 = sc.parallelize(list, 3) val rdd2 = rdd1.mapPartitionsWithIndex((index,iter)=>{ val l = ListBuffer[String]() while(iter.hasNext){ val v = iter.next() l.append(index+"_"+v) } l.iterator }, true).foreach(println) }
(3)结果
二.
2.1 fillter
过滤操作,满足filter内function函数为true的RDD内所有元素组成一个新的数据集。如:filter(a == 1)。
(1)使用Java编写
public static void fillter() { List<Integer> list = Arrays.asList(1,2,3,4,5,6); JavaRDD<Integer> rdd = jsc.parallelize(list); rdd.filter(new Function<Integer, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Integer i) throws Exception { return i%2==0; } }).foreach(new VoidFunction<Integer>() { @Override public void call(Integer a) throws Exception { System.out.println(a); } }); }
(2)使用scala编写
def fillter: Unit = { val list = List(1,2,3,4,5,6) val rdd1 = sc.parallelize(list) rdd1.filter(_%2==0).foreach(println) }
(3)结果
2.2 sample(withReplacement, fraction, seed)
采样操作,用于从样本中取出部分数据。withReplacement是否放回,fraction采样比例,seed用于指定的随机数生成器的种子。(是否放回抽样分true和false,fraction取样比例为(0, 1]。seed种子为整型实数。)
(1)使用Java编写
public static void sample() { List<Integer> list = Arrays.asList(1,2,3,4,5,6); JavaRDD<Integer> rdd = jsc.parallelize(list); rdd.sample(false, 0.5, 1).foreach(new VoidFunction<Integer>() { @Override public void call(Integer s) throws Exception { System.out.println(s); } }); }
(2)使用scala编写
def sample: Unit = { val list = List(1,2,3,4,5,6) val rdd1 = sc.parallelize(list) rdd1.sample(false,0.5, 1).foreach(println) }
(3)结果
2.3 cartesian
cartesian是用于求笛卡尔积的,该操作不会执行shuffle操作。
(1)使用Java编写
public static void cartesian() { List<String> list1 = Arrays.asList("A","B","C"); List<Integer> list2 = Arrays.asList(1,2,3); JavaRDD<String> rdd1 = jsc.parallelize(list1); JavaRDD<Integer> rdd2 = jsc.parallelize(list2); rdd1.cartesian(rdd2).foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println(tuple._1+"---->"+tuple._2); } }); }
(2)使用scala编写
def cartesian: Unit = { val list1 = List("A","B","C") val list2 = List(1,2,3) val rdd1 = sc.parallelize(list1) val rdd2 = sc.parallelize(list2) rdd1.cartesian(rdd2).foreach(tuple =>println(tuple._1+"--->"+tuple._2)) }
(3)结果
2.4 reduceByKey(function,[numTasks])
reduceByKey仅将RDD中所有K,V对中K值相同的V进行合并。
(1)使用Java编写
public static void reduceByKey() { JavaRDD<String> rdd = jsc.textFile("words"); rdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }).mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { // TODO Auto-generated method stub return v1+v2; } }).foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> s) throws Exception { System.out.println(s); } }); }
(2)使用scala编写
def reduceByKey: Unit = { sc.textFile("words").flatMap(_.split(" ")).map(new Tuple2(_,1)).reduceByKey((_+_)).foreach(println) }
(3)结果
三、union,join和groupByKey
3.1 union
当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的。不去重。
(1)使用Java编写
public static void union() { List<Integer> list1 = Arrays.asList(1,2,3,4); List<Integer> list2 = Arrays.asList(3,4,5,6); JavaRDD<Integer> rdd1 = jsc.parallelize(list1); JavaRDD<Integer> rdd2 = jsc.parallelize(list2); rdd1.union(rdd2).foreach(new VoidFunction<Integer>() { @Override public void call(Integer i) throws Exception { System.out.println(i); } }); }
(2)使用scala编写
def union: Unit = { val list1 = List(1,2,3,4) val list2 = List(3,4,5,6) val rdd1 = sc.parallelize(list1) val rdd2 = sc.parallelize(list2) rdd1.union(rdd2).foreach(println) }
(3)结果
3.2 groupByKey
groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。
(1)使用Java编写
public static void groupByKey() { List<Tuple2<String,String>> list = Arrays.asList( new Tuple2("Doctor","A"), new Tuple2("Actor","B"), new Tuple2("Doctor","C"), new Tuple2("Actor","D") ); JavaPairRDD<String, String> rdd = jsc.parallelizePairs(list); JavaPairRDD<String, Iterable<String>> rdd2 = rdd.groupByKey(); rdd2.foreach(new VoidFunction<Tuple2<String,Iterable<String>>>() { @Override public void call(Tuple2<String, Iterable<String>> t) throws Exception { String s = t._1; Iterator<String> iter = t._2.iterator(); String person = ""; while(iter.hasNext()) { person = person + iter.next()+" "; } System.out.println("职业:"+s+",人员:"+person); } }); }
(2)使用scala编写
def groupByKey: Unit = { val list = List( new Tuple2("Doctor","A"), new Tuple2("Actor","B"), new Tuple2("Doctor","C"), new Tuple2("Actor","D") ) val rdd = sc.parallelize(list) rdd.groupByKey().foreach(t =>{ val s = t._1 val iter = t._2.iterator var person = "" while(iter.hasNext) person = person + iter.next + " " println("职业:"+s+",人员:"+person) } ) }
(3)结果
3.3 join
join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合
(1)使用Java编写
public static void join() { List<Tuple2<Integer,String>> list1 = Arrays.asList( new Tuple2<Integer,String>(1,"小明"), new Tuple2<Integer,String>(2,"小米"), new Tuple2<Integer,String>(3,"晓红"), new Tuple2<Integer,String>(4,"小绿") ); List<Tuple2<Integer,Integer>> list2 = Arrays.asList( new Tuple2<Integer,Integer>(1,88), new Tuple2<Integer,Integer>(2,99), new Tuple2<Integer,Integer>(3,100), new Tuple2<Integer,Integer>(4,98) ); JavaPairRDD<Integer, String> rdd1 = jsc.parallelizePairs(list1); JavaPairRDD<Integer, Integer> rdd2 = jsc.parallelizePairs(list2); JavaPairRDD<Integer, Tuple2<String, Integer>> rdd = rdd1.join(rdd2); rdd.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() { @Override public void call(Tuple2<Integer, Tuple2<String, Integer>> tuple) throws Exception { System.out.println("学号:"+tuple._1+",姓名:"+tuple._2._1+",分数:"+tuple._2._2); } }); }
(2)使用scala编写
def join: Unit = { val list1 = List( new Tuple2(1,"小明"), new Tuple2(2,"小米"), new Tuple2(3,"晓红"), new Tuple2(4,"小绿") ) val list2 = List( new Tuple2(1,88), new Tuple2(2,99), new Tuple2(3,100), new Tuple2(4,98) ) val rdd1 = sc.parallelize(list1) val rdd2 = sc.parallelize(list2) rdd1.join(rdd2).foreach(tuple => { println("学号:"+tuple._1+",姓名:"+tuple._2._1+",分数:"+tuple._2._2) }) }
(3)结果
四、distinct、intersection、subtract
4.1 distinct([numTasks])
返回一个在源数据集去重之后的新数据集,即去重,并局部无序而整体有序返回。
(1)使用Java编写
public static void distinct() { List<Integer> list = Arrays.asList(1,1,2,3,4,55,55,6,4,32,2,1,3); jsc.parallelize(list).distinct().foreach(new VoidFunction<Integer>() { @Override public void call(Integer s) throws Exception { System.out.println(s); } }); }
(2)使用scala编写
def distinct: Unit = { val list = List(1,1,2,3,4,55,55,6,4,32,2,1,3) sc.parallelize(list).distinct().foreach(println) }
(3)结果
4.2 intersection
对于源数据集和其他数据集求交集,并去重,且无序返回。
(1)使用Java编写
public static void intersection() { List<Integer> list1 = Arrays.asList(1,2,3,4); List<Integer> list2 = Arrays.asList(3,4,5,6,1,3); JavaRDD<Integer> rdd1 = jsc.parallelize(list1); JavaRDD<Integer> rdd2 = jsc.parallelize(list2); rdd1.intersection(rdd2).foreach(new VoidFunction<Integer>() { @Override public void call(Integer s) throws Exception { System.out.println(s); } }); }
(2)使用scala编写
def intersection: Unit = { val list1 = List(1,2,3,4) val list2 = List(3,4,5,6,1,3) val rdd1 = sc.parallelize(list1) val rdd2 = sc.parallelize(list2) rdd1.intersection(rdd2).foreach(println) }
(3)结果
4.3 subtract
subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。
(1)使用Java编写
public static void subtract() { List<Integer> list1 = Arrays.asList(1,2,3,4,11,12); List<Integer> list2 = Arrays.asList(3,4,5,6,1,3); JavaRDD<Integer> rdd1 = jsc.parallelize(list1); JavaRDD<Integer> rdd2 = jsc.parallelize(list2); rdd1.subtract(rdd2).foreach(new VoidFunction<Integer>() { @Override public void call(Integer s) throws Exception { System.out.println(s); } }); }
(2)结果
五、coalesce、repartition、repartitionAndSortWithinPartitions
5.1 coalesce
该函数用于将RDD进行重分区,使用HashPartitioner。
重新分区,减少RDD中分区的数量到numPartitions。
(1)使用Java编写
public static void coalesce() { JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,3,45,2,4,542,2),3); rdd.coalesce(1).foreach(new VoidFunction<Integer>() { @Override public void call(Integer s) throws Exception { System.out.println(s); } }); }
(2)结果
5.2 repartition
进行重分区,解决的问题:本来分区数少 -》 增加分区数
(1)使用Java编写
public static void repartition() { jsc.parallelize(Arrays.asList(1,3,4,24,14,421,1),2).repartition(4).foreach(new VoidFunction<Integer>() { @Override public void call(Integer s) throws Exception { System.out.println(s); } }); }
(2)结果
5.3 repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。
(1)使用Java编写
public static void repartitionAndSortWithinPartitions() { JavaPairRDD<Integer, Integer> rdd = jsc.parallelize(Arrays.asList(1,3,4,24,14,421,1),1) .mapToPair(new PairFunction<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Integer, Integer> call(Integer s) throws Exception { return new Tuple2<Integer, Integer>(s, s); } }); JavaPairRDD<Integer, Integer> rdd2 = rdd.repartitionAndSortWithinPartitions(new Partitioner() { private static final long serialVersionUID = 1L; @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { Integer p = Integer.valueOf(key.toString()); if(p%2==0) { return 0; }else { return 1; } } }); JavaRDD<String> rdd3 = rdd2.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,Integer>>, Iterator<String>>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(Integer index, Iterator<Tuple2<Integer, Integer>> iter) throws Exception { List<String> list = new ArrayList<>(); while(iter.hasNext()) { list.add(index+" "+iter.next()); } return list.iterator(); } }, true); rdd3.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); }
(2)结果
六、cogroup、sortBykey、aggregateByKey
6.1 cogroup
对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。
(1)使用Java编写
public static void cogroup() { List<Tuple2<Integer, String>> list1 = Arrays.asList( new Tuple2<Integer, String>(1, "张三"), new Tuple2<Integer, String>(2, "李四") ); List<Tuple2<Integer, String>> list2 = Arrays.asList( new Tuple2<Integer, String>(1, "王五"), new Tuple2<Integer, String>(2, "赵柳"), new Tuple2<Integer, String>(3, "田七") ); List<Tuple2<Integer, String>> list3 = Arrays.asList( new Tuple2<Integer, String>(1, "张三"), new Tuple2<Integer, String>(2, "李四"), new Tuple2<Integer, String>(3, "尾巴") ); JavaPairRDD<Integer, String> list1RDD = jsc.parallelizePairs(list1); JavaPairRDD<Integer, String> list2RDD = jsc.parallelizePairs(list2); JavaPairRDD<Integer, String> list3RDD = jsc.parallelizePairs(list3); list1RDD.cogroup(list2RDD,list3RDD).foreach(new VoidFunction<Tuple2<Integer,Tuple3<Iterable<String>,Iterable<String>,Iterable<String>>>>() { @Override public void call(Tuple2<Integer, Tuple3<Iterable<String>, Iterable<String>, Iterable<String>>> s) throws Exception { System.out.println(s); } }); }
(2)结果
6.2 sortBykey
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions
中实现的,实现如下
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size) : RDD[(K, V)] = { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }
从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。参数主要有两个,一是指定升降序排序,默认是true,二是指定分区数,默认是父rdd的分区数。下面对sortByKey的使用进行说明:
(1)使用Java编写
public static void sortByKey() { JavaRDD<String> rdd = jsc.textFile("words"); JavaPairRDD<String, Integer> rdd2 = rdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }).mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); rdd3.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception { return new Tuple2<Integer, String>(t._2, t._1); } }).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception { return new Tuple2<String, Integer>(t._2, t._1); } }).foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> s) throws Exception { System.out.println(s); } }); }
(2)结果
6.3 aggregateByKey
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。
(1)使用Java编写
public static void aggregateByKey() { List<Tuple2<String, Integer>> list = Arrays.asList( new Tuple2<>("cat", 10), new Tuple2<>("mouse",22), new Tuple2<>("mouse", 5), new Tuple2<>("cat", 10), new Tuple2<>("mouse",22), new Tuple2<>("dog", 5) ); JavaPairRDD<String, Integer> rdd = jsc.parallelizePairs(list,2); rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<String,Integer>>, Iterator<Tuple2<String,Integer>>>() { private static final long serialVersionUID = 1L; @Override public Iterator<Tuple2<String, Integer>> call(Integer index , Iterator<Tuple2<String, Integer>> iter) throws Exception { while(iter.hasNext()) { System.out.println("partitionIndex:"+index+"---->"+iter.next()); } return iter; } }, true).collect(); /* partitionIndex:0---->(cat,10) partitionIndex:0---->(mouse,22) partitionIndex:0---->(mouse,5) partitionIndex:1---->(cat,10) partitionIndex:1---->(mouse,22) partitionIndex:1---->(dog,5) 该数据源被分成两个partition */ /** * 该函数第一个参数为起始值,该值在一个函数执行时,会在每个key的values中加入该值 */ JavaPairRDD<String, Integer> rdd2 = rdd.aggregateByKey(100, new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; /* * 该函数统计同个partition内的数据 * */ @Override public Integer call(Integer v1, Integer v2) throws Exception { return Math.max(v1, v2); } }, new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; /** * 该函数统计不同partition内的数据 */ @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); rdd2.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> s) throws Exception { System.out.println(s); } }); }
(2)结果
6.4 combineByKey
combineByKey()是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和aggregate()一样,combineByKey()可以让用户返回与输入数据的类型不同的返回值。
要理解combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。
如果这是一个新的元素,combineByKey()会使用一个叫做createCombiner()的函数来创建那个键对应的累加器的初始值。需要注意的是,这个过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。
如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。
由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果合并。
(1)使用Java编写
public static void combineByKey() { List<Tuple2<String, Integer>> list = Arrays.asList( new Tuple2<>("晓红",11 ), new Tuple2<>("小绿",12 ), new Tuple2<>("小黑",13 ), new Tuple2<>("晓红",14 ), new Tuple2<>("小绿",15 ), new Tuple2<>("晓红",16 ) ); JavaPairRDD<String, Integer> rdd = jsc.parallelizePairs(list,2); rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<String,Integer>>, Iterator<Tuple2<String,Integer>>>() { private static final long serialVersionUID = 1L; @Override public Iterator<Tuple2<String, Integer>> call(Integer index, Iterator<Tuple2<String, Integer>> iter) throws Exception { List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); while(iter.hasNext()){ Tuple2<String, Integer> next = iter.next(); System.out.println("partitionindex ="+index+",value="+next); list.add(next); } return list.iterator(); } }, true).collect(); /**数据分区结果: partitionindex =0,value=(晓红,11) partitionindex =0,value=(小绿,12) partitionindex =0,value=(小黑,13) partitionindex =1,value=(晓红,14) partitionindex =1,value=(小绿,15) partitionindex =1,value=(晓红,16) */ rdd.combineByKey(new Function<Integer, Integer>() { /** * 该方法与aggregateByKey相似(底层貌似是combineByKey) * 该方法的第一个参数的方法实现数据转化的功能,可以将收到的数据转化成任意类型的数据 */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer a) throws Exception { return a; } }, new Function2<Integer,Integer,Integer>() { /** * 第二个参数的方法处理来自同一个partition内的数据 */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer a, Integer b) throws Exception { return Math.max(a, b); } }, new Function2<Integer,Integer,Integer>() { /** * 第三个参数的方法处理来自不同partition的数据 */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer a, Integer b) throws Exception { return a+b; } }).foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> a) throws Exception { System.out.println(a); } }); }
(2)结果
七、zip相关算子