Spark 算子Java操作示例。

时间:2022-12-19 20:47:09

Spark中有许许多多的算子来支持各种操作,但其中主要分为两种;一种就是Transformations算子,一种是Action算子。官方文档中已经指出了常用的算子。好记性不如烂笔头,在这里记录一下各个算子的作用以及使用方法。

Transformations算子:顾名思义,这种算子的作用就是将一个RDD转换成另一种RDD,有的算子转换过程中还会涉及到parition的变化和Shuffle操作,这里只介绍算子的使用方法,其中的parition和shuffle的具体变换可能不会提到。

Action算子:该算子会触发一个runJob操作,也就是只有使用了Action算子才会将程序提交到集群进行计算,最后会得到一个结果。


SparkConf和JavaSparkContext的初始化。

SparkConf conf = new SparkConf().setMaster("local").setAppName("TransformationsOperator");
JavaSparkContext sc = new JavaSparkContext(conf);

1 . map(func)。

官方介绍:

Return a new distributed dataset formed by passing each element of the source through a function func.

通过函数将RDD中的每个元素进行转换形成一个新的RDD。

操作示例:

    // map,一次只处理一个parition中的一条数据。
private static void MapOperator(JavaSparkContext sc) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

JavaRDD<Integer> numberRDD = sc.parallelize(numbers);// 得到一个RDD

JavaRDD<String> results = numberRDD.map(new Function<Integer, String>() {//使用map操作将Integer类型转换成String

private static final long serialVersionUID = 1L;

@Override
public String call(Integer number) throws Exception {

return "number:" + number;
}

});

results.foreach(new VoidFunction<String>() {

private static final long serialVersionUID = 1L;

@Override
public void call(String arg0) throws Exception {
System.out.println(arg0);
}
});
}

2.mapPartitions(func)。

官方介绍:

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.

作用于map一致,不过是以每个parition作为一个操作单位的,所以返回类型是一个Iterator。

操作示例:

// mapPartitions,这个是针对Partition的操作,一次会处理一个partition的所有数据
private static void MapPartitionsOperator(JavaSparkContext sc) {

List<String> names = Arrays.asList("zhangsan", "lisi", "wangwu");

JavaRDD<String> nameRDD = sc.parallelize(names,2);

final Map<String, Integer> scoreMap = new HashMap<>();
scoreMap.put("zhangsan", 100);
scoreMap.put("lisi", 99);
scoreMap.put("wangwu", 98);

//这里会使用FlatMapFunction将Iterator中的数据自动压缩成Integer数据。
JavaRDD<Integer> scoreRDD = nameRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {

private static final long serialVersionUID = 1L;

@Override
public Iterator<Integer> call(Iterator<String> iterator) throws Exception {
List<Integer> scores = new ArrayList<>();
while (iterator.hasNext()) {
String name = iterator.next();
int score = scoreMap.get(name);
scores.add(score);
}
return scores.iterator();
}
});

scoreRDD.foreach(new VoidFunction<Integer>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Integer score) throws Exception {
System.out.println(score);
}
});

}

可以通过FlatMapFunction的参数看到,第一个是Iterator< String>的,也就是输入的数据是一个Iterator,输出的是Integer,这个输入的 Iterator就是将一个partition中的所有数据传入进来,经过操作后变成一个Iterator< Integer>的,然后在自动压缩成Integer。


3 . mapPartitionsWithIndex(func)。

官方介绍:

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.

与上述的mapParitions神似,不过每次调用call函数的时候会传入一个当前parition的下标进来。

操作示例:

// 可以看到使用了哪一个parition,采用分区的话:parallelize优先级最高,其次conf.set,最后时local[];
private static void MapPartitionsWithIndexOperator(JavaSparkContext sc) {
List<String> names = Arrays.asList("zhangsan", "lisi", "wangwu");

JavaRDD<String> nameRDD = sc.parallelize(names, 2);//这里加载的数据设置成2个partition。

JavaRDD<String> results = nameRDD
.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

private static final long serialVersionUID = 1L;
//这里会有一个Integer的index,可以通过这个来查看当前操作属于哪一个parition。
@Override
public Iterator<String> call(Integer index, Iterator<String> names) throws Exception {
List<String> nameList = new ArrayList<>();

while (names.hasNext()) {
String name = names.next();
name = index + ":" + name;
nameList.add(name);
}

return nameList.iterator();
}
}, true);

results.foreach(new VoidFunction<String>() {

private static final long serialVersionUID = 1L;

@Override
public void call(String name) throws Exception {
System.out.println(name);
}
});

}

4.filter(func)。

官方介绍:

Return a new dataset formed by selecting those elements of the source on which func returns true.

也就是通过函数筛选出所需要的数据元素,返回true也代表保留,false代表抛弃。

操作示例:

// 过滤出一部分数据
private static void FilterOperator(JavaSparkContext sc) {
List<Integer> scores = Arrays.asList(43, 60, 59, 70, 81);
JavaRDD<Integer> scoresRDD = sc.parallelize(scores);

//筛选出分数小于60的。
JavaRDD<Integer> results = scoresRDD.filter(new Function<Integer, Boolean>() {

private static final long serialVersionUID = 1L;

@Override
public Boolean call(Integer score) throws Exception {

return score < 60;
}
});

results.foreach(new VoidFunction<Integer>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Integer score) throws Exception {
System.out.println(score);
}
});

}

5.coalesce(numPartitions)。

官方介绍:

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

将RDD中的partition进行减少,尤其是在上述的filter之后使用效果更好,因为filter会可能会过滤掉大量的数据从而导致一个partition中的数据量很少,这时候使用coalesce算子可以尽量的合并partition,一定程度少减少数据倾斜的问题。

操作示例:

// 将partition的数量减少
private static void CoalesceOperator(JavaSparkContext sc) {
List<String> students = Arrays.asList("stu1", "stu2", "stu3", "stu4", "stu5", "stu6");
JavaRDD<String> cls = sc.parallelize(students, 4);// 设置为四个partition

JavaRDD<String> temp = cls.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

private static final long serialVersionUID = 1L;

@Override
public Iterator<String> call(Integer index, Iterator<String> cls) throws Exception {
List<String> list = new ArrayList<>();

while (cls.hasNext()) {
String stu = cls.next();
stu = "1[" + index + "]" + stu;
list.add(stu);
}

return list.iterator();
}
}, true);

JavaRDD<String> temp2 = temp.coalesce(2);//将四个partition减少到两个

JavaRDD<String> result = temp2
.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

private static final long serialVersionUID = 1L;

@Override
public Iterator<String> call(Integer index, Iterator<String> cls) throws Exception {
List<String> list = new ArrayList<>();
while (cls.hasNext()) {
String stu = cls.next();
stu = "2[" + index + "]," + stu;
list.add(stu);
}
return list.iterator();
}
}, true);

result.foreach(new VoidFunction<String>() {

private static final long serialVersionUID = 1L;

@Override
public void call(String stu) throws Exception {
System.out.println(stu);
}
});
}

上述的代码会将第一次运行时数据所在的partition的下标进行保存,然后将parition减少,再次运行将第二次的partition下标进行保存,方便对比查看。


6.repartition(numPartitions)。

官方介绍:

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

用来增加parition,并且会将其中的数据进行平衡操作,使用shuffle操作。

操作示例:

// 增加Partition,使用shuffle操作
private static void RepartitionOperator(JavaSparkContext sc) {
List<String> students = Arrays.asList("stu1", "stu2", "stu3", "stu4", "stu5", "stu6");
JavaRDD<String> cls = sc.parallelize(students, 2);// 设置为两个partition

JavaRDD<String> temp = cls.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

private static final long serialVersionUID = 1L;

@Override
public Iterator<String> call(Integer index, Iterator<String> cls) throws Exception {
List<String> list = new ArrayList<>();

while (cls.hasNext()) {
String stu = cls.next();
stu = "1[" + index + "]" + stu;
list.add(stu);
}

return list.iterator();
}
}, true);

JavaRDD<String> temp2 = temp.repartition(3);//增加到三个

JavaRDD<String> result = temp2
.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

private static final long serialVersionUID = 1L;

@Override
public Iterator<String> call(Integer index, Iterator<String> cls) throws Exception {
List<String> list = new ArrayList<>();
while (cls.hasNext()) {
String stu = cls.next();
stu = "2[" + index + "]," + stu;
list.add(stu);
}
return list.iterator();
}
}, true);

result.foreach(new VoidFunction<String>() {

private static final long serialVersionUID = 1L;

@Override
public void call(String stu) throws Exception {
System.out.println(stu);
}
});

}

与上述的一样,将两次运行的partition的下标进行保存,方便对比。


7 . flatMap(func)。

官方介绍:

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

与最开始介绍的map类似,不过map每次操作一个数据并且返回一个数据,但时flatMap可能会返回多个数据。

操作示例:

// 每次對傳進來的一行數據進行單詞的切割
private static void FlatMapOperator(JavaSparkContext sc) {
List<String> words = Arrays.asList("hello ha","nihao haha","hello hao");
JavaRDD<String> wordRDD = sc.parallelize(words);

JavaRDD<String> result = wordRDD.flatMap(new FlatMapFunction<String, String>() {

@Override
public Iterator<String> call(String line) throws Exception {

//按照空格将每次传进来的数据进行分割并返回。
return Arrays.asList(line.split(" ")).iterator();
}
});

result.foreach(new VoidFunction<String>() {

@Override
public void call(String word) throws Exception {
System.out.println(word);
}
});

}

8 . collect()。

官方介绍:

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

将集群中的其他节点(如果有的话)的数据pull到driver所在的机器上,如果数据量过大的话可能会造成内存溢出的现象,所以官方的建议就是返回的数据量小的话会很有用。

操作示例:

//foreach在从节点进行的遍历,collect会从集群中把数据pull到driver所在的机器上
private static void CollectOperator(JavaSparkContext sc) {
List<Integer> numberList = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> numberRDD = sc.parallelize(numberList);

JavaRDD<Integer> temp = numberRDD.map(new Function<Integer, Integer>() {

@Override
public Integer call(Integer arg0) throws Exception {

return arg0 * 2;
}
});

List<Integer> result = temp.collect();

for (Integer num : result) {
System.out.println(num);
}


}

9 . count()。

官方介绍:

Return the number of elements in the dataset.

比较简单了,就是统计一下RDD中存在多少数据量。

操作示例:

    // 统计一下RDD里面有多少数据
private static void CountOperator(JavaSparkContext sc) {
List<String> stu = Arrays.asList("stu1","stu2","stu3","stu4","stu5");
JavaRDD<String> stuRDD = sc.parallelize(stu);

long count = stuRDD.count();

System.out.println(count);

}

10 . groupByKey([numTasks])。

官方介绍:

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

作用就是将RDD中根据Key进行分组操作,所有Key对应的是一个Iterable。第一个Note介绍的就是说reduceByKey或者aggregateByKey的性能要比这个groupByKey 的性能好:

如果能用reduceByKey,那就用reduceByKey,因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减小网络传输的开销。 只有在reduceByKey处理不了时,才用groupByKey().map()来替代。 因为reduceBykey聚合后传输的数据量就变少了,而groupBykey没聚合会传递到taskResult上面数据量比较大。

更好的解释可以看一下这个博客:http://blog.csdn.net/zongzhiyuan/article/details/49965021

第二个Note说的是并行度的问题(通俗的就是task),注意到groupByKey可以跟着一个参数,这个参数可以决定下面的操作时候的并行度,如果没有设置的话,就默认为父RDD的并行度,如果设置了话就按照参数的来进行分配,并且下面的task也会变成该参数对应的并行度。

操作示例:

// 按照key进行分组
private static void GroupByKeyOperator(JavaSparkContext sc) {
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("zhangsan", 100),
new Tuple2<String,Integer>("zhangsan", 50),
new Tuple2<String,Integer>("lisi", 99),
new Tuple2<String,Integer>("wangwu", 120),
new Tuple2<String,Integer>("wangwu", 30));

JavaPairRDD<String, Integer> scoreRDD = sc.parallelizePairs(scoreList,2);//设置为两个partition

JavaPairRDD<String, Iterable<Integer>> result = scoreRDD.groupByKey(3);

//此时的并行度依然为3,如果不传入参数的话就使用的是父RDD也就是scoreRDD的并行度,也就是2
result.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Iterable<Integer>> score) throws Exception {
System.out.println(score._1 + " " + score._2);
}
});

}

11 . reduce(func)。

官方介绍

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

每次传入两个参数通过函数func得到一个返回值,然后使用该值继续与后面的数进行调用func,直到所有的数据计算完成,最后返回一个计算结果。

操作示例:

    // 传入两个个参数并返回一个结果
private static void ReduceOperator(JavaSparkContext sc) {
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6);

JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer num1, Integer num2) throws Exception {

return num1 + num2;
}
});

System.out.println(sum);

}

12 . reduceByKey(func, [numTasks])。

官方介绍:

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

简单的说就是groupByKey + reduce。先按照Key进行分组,然后将每组的Key进行reduce操作,得到一个Key对应一个Value的RDD。第二个参数就是指定使用多少task来执行reduce操作。

操作示例:

//reduceByKey = groupByKey + reduce 
private static void ReduceByKeyOperator(JavaSparkContext sc) {
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("zhangsan", 100),
new Tuple2<String,Integer>("zhangsan", 50),
new Tuple2<String,Integer>("lisi", 99),
new Tuple2<String,Integer>("wangwu", 120),
new Tuple2<String,Integer>("wangwu", 30));

JavaPairRDD<String, Integer> scoreRDD = sc.parallelizePairs(scoreList);

scoreRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer score1, Integer score2) throws Exception {
return score1 + score2;
}
},2).foreach(new VoidFunction<Tuple2<String,Integer>>() {


private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Integer> score) throws Exception {
System.out.println("name:" + score._1 + " score:" + score._2);
}
});

}

13 . sample(withReplacement, fraction, seed)。

官方介绍:

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

对RDD中的数据进行随机取样操作,sample第一个参数代表产生的样本数据是否可以重复,第二个参数代表取样的比例,第三个数值代表一个随机数种子,如果传入一个常数,那么每次取样结果会一样。

操作示例:

// 随机从RDD中取样
private static void SampleOperator(JavaSparkContext sc) {
List<String> stu = Arrays.asList("stu1","stu2","stu3","stu4","stu5","stu6");
JavaRDD<String> stuRDD = sc.parallelize(stu);

// 第一个参数决定取样结果是否可重复,第二个参数决定取多少比例的数据,第三个是自定义的随机数种子,如果传入一个常数则每次产生的值一样
stuRDD.sample(false, 0.5).foreach(new VoidFunction<String>() {


private static final long serialVersionUID = 1L;

@Override
public void call(String arg0) throws Exception {
System.out.println(arg0);
}
});

}

14 . take(n)。

官方介绍:

Return an array with the first n elements of the dataset.

这个比较简单,就是将RDD中的前多少数据返回过来,返回结果为数组形式。

操作示例:

    // 取出RDD中的前多少数据
private static void TakeOperator(JavaSparkContext sc) {
List<Integer> numbers = Arrays.asList(1,2,3,4,5);

JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

List<Integer> nums = numberRDD.take(3);

for (Integer num : nums) {
System.out.println(num);
}

}

15 . takeSample(withReplacement, num, [seed])。

官方介绍:

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

相当于是先进行sample然后进行take操作。

操作示例:

    //先 sample,再take
private static void TakeSampleOperator(JavaSparkContext sc) {
List<Integer> numbers = Arrays.asList(1,2,3,4,5);

JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

List<Integer> nums = numberRDD.takeSample(false, 2);

for (Integer num : nums) {
System.out.println(num);
}

}

16 . union(otherDataset)。

官方介绍:

Return a new dataset that contains the union of the elements in the source dataset and the argument.

返回两个RDD中的并集(但并不会去重),并且parition也会合并,也就是并行度会发生改变。

操作示例:

    // Union,将两个RDD组合起来返回一个新的RDD,partition也合并
private static void UnionOperator(JavaSparkContext sc) {
List<String> names1 = Arrays.asList("stu1","stu2","stu3");
List<String> names2 = Arrays.asList("stu1","stu5","stu6");

JavaRDD<String> nameRDD1 = sc.parallelize(names1,2);//两个parition
JavaRDD<String> nameRDD2 = sc.parallelize(names2);//一个partition

nameRDD1.union(nameRDD2).foreach(new VoidFunction<String>() {//此时由三个parition,也就有三个task

private static final long serialVersionUID = 1L;

@Override
public void call(String name) throws Exception {
System.out.println(name);
}
});

}

17 . distinct([numTasks]))。

官方介绍:

Return a new dataset that contains the distinct elements of the source dataset.

简单的去重操作。

操作示例:

    // 去重
private static void DistinctOperator(JavaSparkContext sc) {
List<String> stu = Arrays.asList("wangwu","lisi","zhaoliu","lisi");

JavaRDD<String> stuRDD = sc.parallelize(stu);

stuRDD.distinct().foreach(new VoidFunction<String>() {


private static final long serialVersionUID = 1L;

@Override
public void call(String stu) throws Exception {
System.out.println(stu);
}
});

}

18 . sortByKey([ascending], [numTasks])。

官方介绍:

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

根据Key进行排序操作,如果第一个参数为true,则结果为升序,反之为降序。第二个参数就是决定执行的task数目。

操作示例:

//根据key排序
private static void SortByKeyOperator(JavaSparkContext sc) {
List<Tuple2<Integer, String>> stus = Arrays.asList(
new Tuple2<Integer, String>(10, "lisi"),
new Tuple2<Integer, String>(20, "wangwu"),
new Tuple2<Integer, String>(10, "zhaoliu"),
new Tuple2<Integer, String>(30, "zhangsan"));

JavaPairRDD<Integer, String> stusRDD = sc.parallelizePairs(stus);

stusRDD.sortByKey(true,2).foreach(new VoidFunction<Tuple2<Integer,String>>() {


private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<Integer, String> stu) throws Exception {
System.out.println("name:" + stu._2 + ",score:" + stu._1);
}
});

}

19 . saveAsTextFile(path)。

官方介绍:

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

将RDD保存在文件系统上,Spark会调用元素的toString方法作为一行数据。

操作示例:

    //将RDD中的数据进行保存
private static void SaveAsTextFileOperator(JavaSparkContext sc) {
List<Integer> numbers = Arrays.asList(1,2,3,4,5);

JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

JavaRDD<Integer> result = numberRDD.map(new Function<Integer, Integer>() {


private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer number) throws Exception {

return number * 2;
}
});

//result.saveAsTextFile("./SaveAsTextFileOperator");//保存在当前目录下
result.saveAsTextFile("hdfs://xxx.xx.xx.xx:xxxx/testSaveAsTextFile");//保存在HDFS上

}

会生成一个testSaveAsTextFile文件夹,如果文件夹存在则抛出异常。


20 . intersection(otherDataset)。

官方介绍:

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

作用就是将两个RDD求交集,当然也进行了去重操作。

操作示例:

// 求交集并去重
private static void IntersectionOperator(JavaSparkContext sc) {
List<String> stus1 = Arrays.asList("stu1","stu2","stu2");
List<String> stus2 = Arrays.asList("stu2","stu3","stu3");

JavaRDD<String> stuRDD1 = sc.parallelize(stus1);
JavaRDD<String> stuRDD2 = sc.parallelize(stus2);

stuRDD1.intersection(stuRDD2).foreach(new VoidFunction<String>() {

@Override
public void call(String stu) throws Exception {
System.out.println(stu);
}
});

}

21 . cartesian(otherDataset)。

官方介绍:

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

相当于进行了一次笛卡尔积的计算,将两个RDD中的数据一一对应起来。

操作示例:

    // 笛卡尔积
private static void CartesianOperator(JavaSparkContext sc) {
List<String> hero = Arrays.asList("张飞","貂蝉","吕布");
List<String> skill = Arrays.asList("闪现","斩杀","眩晕");

JavaRDD<String> heroRDD = sc.parallelize(hero);
JavaRDD<String> skillRDD = sc.parallelize(skill);

heroRDD.cartesian(skillRDD).foreach(new VoidFunction<Tuple2<String,String>>() {

@Override
public void call(Tuple2<String, String> arg0) throws Exception {
System.out.println(arg0);
}
});

}

22 . countByKey()。

官方介绍:

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

只能用在(K,V)类型,用来统计每个key的数据有多少个,返回一个(K,Int)。

操作示例:

// 根据Key进行统计
private static void CountByKeyOperator(JavaSparkContext sc) {
List<Tuple2<String, String>> stus = Arrays.asList(
new Tuple2<String, String>("class1", "stu1"),
new Tuple2<String, String>("class1", "stu2"),
new Tuple2<String, String>("class2", "stu3"),
new Tuple2<String, String>("class1", "stu4"));

JavaPairRDD<String, String> stuRDD = sc.parallelizePairs(stus);

Map<String, Long> result = stuRDD.countByKey();

for (Map.Entry<String, Long> map : result.entrySet()) {
System.out.println(map.getKey() + " " + map.getValue());
}

}

23 . first()。

官方介绍:

Return the first element of the dataset (similar to take(1)).

取出第一个,跟take(1)相似。

操作示例:

    // 取出第一个元素
private static void FirstOperator(JavaSparkContext sc) {
List<String> stus = Arrays.asList("stu1","stu2","stu3");

JavaRDD<String> stuRDD = sc.parallelize(stus);

String firstStu = stuRDD.first();

System.out.println(firstStu);

}

24 . cogroup(otherDataset, [numTasks])。

官方介绍:

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.

将两个RDD按照Key进行汇总,第一个RDD中的Key对应的数据放在一个Iterable中,第二个RDD中同样的Key对应的数据放在一个Iterable中,最后得到一个Key,对应两个Iterable的数据。第二个参数就是指定task数量。

操作示例:

// 按照Key进行分类汇总
private static void CogroupOperator(JavaSparkContext sc) {
List<Tuple2<String, String>> stus = Arrays.asList(
new Tuple2<String, String>("stu1", "zhangsan"),
new Tuple2<String, String>("stu2", "lisi"),
new Tuple2<String, String>("stu3", "lisi"),
new Tuple2<String, String>("stu2", "wangwu"),
new Tuple2<String, String>("stu2", "lisi"));

List<Tuple2<String, String>> scores = Arrays.asList(
new Tuple2<String, String>("stu1", "90"),
new Tuple2<String, String>("stu1", "100"),
new Tuple2<String, String>("stu2", "80"),
new Tuple2<String, String>("stu3", "120"));

JavaPairRDD<String, String> stuRDD = sc.parallelizePairs(stus);
JavaPairRDD<String, String> scoreRDD = sc.parallelizePairs(scores);

JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<String>>> result = stuRDD.cogroup(scoreRDD);

result.foreach(new VoidFunction<Tuple2<String,Tuple2<Iterable<String>,Iterable<String>>>>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> result) throws Exception {
System.out.println(result._1);//第一个Tuple2的Key
System.out.println(result._2._1);//第一个Tuple2的Vale
System.out.println(result._2._2);//第二个Tuple2的Value
System.out.println();
}
});

}

输出结果:

stu3
[lisi]
[120]

stu2
[lisi, wangwu, lisi]
[80]

stu1
[zhangsan]
[90, 100]

25 . join(otherDataset, [numTasks])。

官方简介:

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

同样的也是按照Key将两个RDD中进行汇总操作,不过会对每个Key所对应的两个RDD中的数据进行笛卡尔积计算。

操作示例:

//按照Key进行分类汇总,并且做笛卡尔积
private static void JoinOperator(JavaSparkContext sc) {
List<Tuple2<String, String>> stus = Arrays.asList(
new Tuple2<String, String>("stu1", "zhangsan"),
new Tuple2<String, String>("stu2", "lisi"),
new Tuple2<String, String>("stu3", "lisi"),
new Tuple2<String, String>("stu2", "wangwu"),
new Tuple2<String, String>("stu2", "lisi"));

List<Tuple2<String, String>> scores = Arrays.asList(
new Tuple2<String, String>("stu1", "90"),
new Tuple2<String, String>("stu1", "100"),
new Tuple2<String, String>("stu2", "80"),
new Tuple2<String, String>("stu3", "120"));

JavaPairRDD<String, String> stuRDD = sc.parallelizePairs(stus);
JavaPairRDD<String, String> scoreRDD = sc.parallelizePairs(scores);

JavaPairRDD<String, Tuple2<String, String>> result = stuRDD.join(scoreRDD);

result.foreach(new VoidFunction<Tuple2<String,Tuple2<String,String>>>() {


private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Tuple2<String, String>> result) throws Exception {
System.out.println(result._1);
System.out.println(result._2._1);
System.out.println(result._2._2);
System.out.println();
}
});

}

输出结果:

stu3
lisi
120
=================

stu2
lisi
80
=================

stu2
wangwu
80
=================

stu2
lisi
80
=================

stu1
zhangsan
90
=================

stu1
zhangsan
100
=================