SortByKey
从名字就能看到,是将Key排序用的。如一个PariRDD-["A":1, "C":4, "B":3, "B":5],按Key排序的话就是A、B、C。注意,这个方法只是对Key进行排序,value不排序。
上代码
/** * 用于对pairRDD按照key进行排序 * @author wuweifeng wrote on 2018/4/18. */ public class Test { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); //spark对普通List的reduce操作 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); List<Tuple2<String, Integer>> data = new ArrayList<>(); data.add(new Tuple2<>("A", 10)); data.add(new Tuple2<>("B", 1)); data.add(new Tuple2<>("A", 6)); data.add(new Tuple2<>("C", 5)); data.add(new Tuple2<>("B", 3)); JavaPairRDD<String, Integer> originRDD = javaSparkContext.parallelizePairs(data); //true为升序,false为倒序 System.out.println(originRDD.sortByKey(true).collect()); System.out.println(originRDD.sortByKey(false).collect()); } }
结果是
[(A,10), (A,6), (B,1), (B,3), (C,5)]
[(C,5), (B,1), (B,3), (A,10), (A,6)]
GroupByKey
类似于mysql中的groupBy,是按key进行分组,形成结果为RDD[key,Iterable[value]],即value变成了集合。
/** * * @author wuweifeng wrote on 2018/4/18. */ public class Test { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); //spark对普通List的reduce操作 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); List<Tuple2<String, Integer>> data = new ArrayList<>(); data.add(new Tuple2<>("A", 10)); data.add(new Tuple2<>("B", 1)); data.add(new Tuple2<>("A", 6)); data.add(new Tuple2<>("C", 5)); data.add(new Tuple2<>("B", 3)); JavaPairRDD<String, Integer> originRDD = javaSparkContext.parallelizePairs(data); System.out.println(originRDD.groupByKey().collect()); } }
结果是[(B,[1, 3]), (A,[10, 6]), (C,[5])]
GroupBy
和GroupByKey类似,只不过groupByKey是指明了按照Key进行分组,所以作用对象必须是PairRDD型的。而GroupBy明显是不知道该按什么进行分组,即分组规则需要我们自己设定。所以groupBy的参数是接收一个函数,该函数的返回值将作为Key。
public class Test { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); //spark对普通List的reduce操作 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); List<Integer> data = new ArrayList<>(); data.add(10); data.add(1); data.add(6); data.add(5); data.add(3); JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data); Map map = originRDD.groupBy(x -> { if (x % 2 == 0) { return "even"; } else { return "odd"; } }).collectAsMap(); System.out.println(map); } }
结果是{odd=[1, 5, 3], even=[10, 6]}
参数里的算法就是判断奇数偶数。
cogroup
这个是groupByKey的升级版,groupByKey是对一个RDD里key相同的value进行组合成一个集合。
cogroup则是对多个RDD里key相同的,合并成集合的集合,例如RDD1.cogroup(RDD2,RDD3,…RDDN), 可以得到(key,Iterable[value1],Iterable[value2],Iterable[value3],…,Iterable[valueN])
看代码
public class Test { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); //spark对普通List的reduce操作 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); JavaRDD<Tuple2<String, Integer>> rdd1 = javaSparkContext.parallelize(Arrays.asList( new Tuple2<>("A", 10), new Tuple2<>("B", 20), new Tuple2<>("A", 30), new Tuple2<>("B", 40))); JavaRDD<Tuple2<String, Integer>> rdd2 = javaSparkContext.parallelize(Arrays.asList( new Tuple2<>("A", 100), new Tuple2<>("B", 200), new Tuple2<>("A", 300), new Tuple2<>("B", 400))); JavaRDD<Tuple2<String, Integer>> rdd3 = javaSparkContext.parallelize(Arrays.asList( new Tuple2<>("A", 1000), new Tuple2<>("B", 2000), new Tuple2<>("A", 3000), new Tuple2<>("B", 4000))); JavaPairRDD<String, Integer> pairRDD1 = JavaPairRDD.fromJavaRDD(rdd1); JavaPairRDD<String, Integer> pairRDD2 = JavaPairRDD.fromJavaRDD(rdd2); JavaPairRDD<String, Integer> pairRDD3 = JavaPairRDD.fromJavaRDD(rdd3); JavaPairRDD<String, Tuple3<Iterable<Integer>, Iterable<Integer>, Iterable<Integer>>> pairRDD = pairRDD1.cogroup(pairRDD2, pairRDD3); System.out.println(pairRDD.collect()); } }
结果是:
[(B,([20, 40],[200, 400],[2000, 4000])), (A,([10, 30],[100, 300],[1000, 3000]))]