6 spark入门键值对操作sortByKey、groupByKey、groupBy、cogroup

时间:2023-01-29 22:22:39

SortByKey

从名字就能看到,是将Key排序用的。如一个PariRDD-["A":1, "C":4, "B":3, "B":5],按Key排序的话就是A、B、C。注意,这个方法只是对Key进行排序,value不排序。

上代码

/**
 * 用于对pairRDD按照key进行排序
 * @author wuweifeng wrote on 2018/4/18.
 */
public class Test {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<Tuple2<String, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>("A", 10));
        data.add(new Tuple2<>("B", 1));
        data.add(new Tuple2<>("A", 6));
        data.add(new Tuple2<>("C", 5));
        data.add(new Tuple2<>("B", 3));

        JavaPairRDD<String, Integer> originRDD = javaSparkContext.parallelizePairs(data);
        //true为升序,false为倒序
        System.out.println(originRDD.sortByKey(true).collect());
        System.out.println(originRDD.sortByKey(false).collect());
    }
}

结果是

[(A,10), (A,6), (B,1), (B,3), (C,5)]

[(C,5), (B,1), (B,3), (A,10), (A,6)]

GroupByKey

类似于mysql中的groupBy,是按key进行分组,形成结果为RDD[key,Iterable[value]],即value变成了集合。

/**
 * 
 * @author wuweifeng wrote on 2018/4/18.
 */
public class Test {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<Tuple2<String, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>("A", 10));
        data.add(new Tuple2<>("B", 1));
        data.add(new Tuple2<>("A", 6));
        data.add(new Tuple2<>("C", 5));
        data.add(new Tuple2<>("B", 3));

        JavaPairRDD<String, Integer> originRDD = javaSparkContext.parallelizePairs(data);
        
        System.out.println(originRDD.groupByKey().collect());
    }
}

结果是[(B,[1, 3]), (A,[10, 6]), (C,[5])]

GroupBy

和GroupByKey类似,只不过groupByKey是指明了按照Key进行分组,所以作用对象必须是PairRDD型的。而GroupBy明显是不知道该按什么进行分组,即分组规则需要我们自己设定。所以groupBy的参数是接收一个函数,该函数的返回值将作为Key。

public class Test {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        List<Integer> data = new ArrayList<>();
        data.add(10);
        data.add(1);
        data.add(6);
        data.add(5);
        data.add(3);

        JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data);
        Map map = originRDD.groupBy(x -> {
            if (x % 2 == 0) {
                return "even";
            } else {
                return "odd";
            }
        }).collectAsMap();

        System.out.println(map);
    }
}

结果是{odd=[1, 5, 3], even=[10, 6]}

参数里的算法就是判断奇数偶数。

cogroup

这个是groupByKey的升级版,groupByKey是对一个RDD里key相同的value进行组合成一个集合。

cogroup则是对多个RDD里key相同的,合并成集合的集合,例如RDD1.cogroup(RDD2,RDD3,…RDDN), 可以得到(key,Iterable[value1],Iterable[value2],Iterable[value3],…,Iterable[valueN]) 

看代码

public class Test {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
        //spark对普通List的reduce操作
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

        JavaRDD<Tuple2<String, Integer>> rdd1 = javaSparkContext.parallelize(Arrays.asList(
                new Tuple2<>("A", 10),
                new Tuple2<>("B", 20),
                new Tuple2<>("A", 30),
                new Tuple2<>("B", 40)));
        JavaRDD<Tuple2<String, Integer>> rdd2 = javaSparkContext.parallelize(Arrays.asList(
                new Tuple2<>("A", 100),
                new Tuple2<>("B", 200),
                new Tuple2<>("A", 300),
                new Tuple2<>("B", 400)));
        JavaRDD<Tuple2<String, Integer>> rdd3 = javaSparkContext.parallelize(Arrays.asList(
                new Tuple2<>("A", 1000),
                new Tuple2<>("B", 2000),
                new Tuple2<>("A", 3000),
                new Tuple2<>("B", 4000)));

        JavaPairRDD<String, Integer> pairRDD1 = JavaPairRDD.fromJavaRDD(rdd1);
        JavaPairRDD<String, Integer> pairRDD2 = JavaPairRDD.fromJavaRDD(rdd2);
        JavaPairRDD<String, Integer> pairRDD3 = JavaPairRDD.fromJavaRDD(rdd3);
        JavaPairRDD<String, Tuple3<Iterable<Integer>, Iterable<Integer>, Iterable<Integer>>> pairRDD = pairRDD1.cogroup(pairRDD2, pairRDD3);
        System.out.println(pairRDD.collect());
    }
}

结果是:

[(B,([20, 40],[200, 400],[2000, 4000])), (A,([10, 30],[100, 300],[1000, 3000]))]