spark常用算子的简单使用

时间:2023-01-11 20:47:08

Scala开发

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
* Created by Administrator on 2017/10/21.
*/

object TransFormation {
def main(args: Array[String]): Unit = {
//map()
//filter()
//flatMap()
//groupByKey()
//reduceByKey()
//sortByKey()
//join()
//union()
//intersection()
//distinct()
cartesian()

}

def cartesian(): Unit ={
val conf = new SparkConf().setAppName("cartesian").setMaster("local")
val sc = new SparkContext(conf)
var list =List(1,2,3,4,5,6,7)
var list1=List("a","b","c","d")
sc.parallelize(list).cartesian(sc.parallelize(list1)).foreach(t=>println(t._1+"\t"+t._2))
}

def distinct(): Unit ={
val conf = new SparkConf().setAppName("distinct").setMaster("local")
val sc = new SparkContext(conf)
var list=List(1,1,1,2,2,3,4,3,5,6,4,5,7)
sc.parallelize(list).distinct().foreach(println(_))
}

def intersection(): Unit ={
val conf = new SparkConf().setAppName("intersection").setMaster("local")
val sc = new SparkContext(conf)
var list =List(1,2,3,4,5,6)
var list1=List(4,5,6,7,8,9)
sc.parallelize(list).intersection(sc.parallelize(list1)).foreach(println(_))
}

def union(): Unit ={
val conf = new SparkConf().setAppName("union").setMaster("local")
val sc = new SparkContext(conf)
var list =List(1,2,3,4)
var list1=List(5,6,7,8)
sc.parallelize(list).union(sc.parallelize(list1)).foreach(println(_))
}
def join(): Unit ={
val conf = new SparkConf().setAppName("join").setMaster("local")
val sc = new SparkContext(conf)
var list1=List((1,"hadoop"),(2,"spark"),(3,"hbase"))
var list2=List((1,"had"),(2,"spa"),(3,"hba"))
sc.parallelize(list1).join(sc.parallelize(list2)).foreach(t=>println(t._1+"\t"+t._2._1+"\t"+t._2._2))
}
def sortByKey(): Unit ={
val conf = new SparkConf().setAppName("sortByKey").setMaster("local")
val sc = new SparkContext(conf)
var list=List((6,"hadoop"),(8,"spark"),(10,"hbase"))
sc.parallelize(list).sortByKey().foreach(t=>println(t._1+"\t"+t._2))
}
def reduceByKey(): Unit ={
val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")
val sc = new SparkContext(conf)
var list=List(("hadoop",111),("spark",222),("hadoop",333),("spark",444))
sc.parallelize(list).reduceByKey(_+_).foreach(t=>println(t._1+"\t"+t._2))

}

def groupByKey(): Unit ={
val conf = new SparkConf().setAppName("groupByKey").setMaster("local")
val sc = new SparkContext(conf)
var list=List((1->"docu"),(2->"idontkoow"))
sc.parallelize(list).groupByKey().foreach(t=>println(t._1+"\t"+t._2))
}
def flatMap(): Unit ={
val conf = new SparkConf().setAppName("flatMap").setMaster("local")
val sc = new SparkContext(conf)
var list=List("you,jump","i,jump")
sc.parallelize(list).flatMap(_.split(",")).foreach(println(_))
}

def filter(): Unit ={
val conf = new SparkConf().setAppName("filter").setMaster("local")
val sc = new SparkContext(conf)
var list=List(1,2,3,4,5,6,7,8,9,10)
sc.parallelize(list).filter(_%2==0).foreach(println(_))
}
def map(): Unit ={
val conf = new SparkConf().setAppName("map").setMaster("local")
val sc = new SparkContext(conf)
var list=List("hadoop","spark","hive")
sc.parallelize(list).map("hello"+_).foreach(x=>println(x))
}

}

Java开发

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Function2;
import scala.Tuple2;
import scala.reflect.runtime.SynchronizedSymbols;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;

/**
* Created by Administrator on 2017/10/21.
*/

public class TransFormationOperator {

public static void main(String[] args) {
//map();
//filter();
//flatMap();
//groupByKey();
//reduceByKey();
//sortByKey();
//join();
//union();
//intersection();
//distinct();
// cartesian();


}
//笛卡尔积
public static void cartesian(){
SparkConf conf = new SparkConf().setAppName("cartesian").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list1 = Arrays.asList(1, 2, 3, 4, 5);
List<String> list2 = Arrays.asList("hadoop", "spark", "hive", "hbase");
JavaRDD<Integer> list1RDD = sc.parallelize(list1);
JavaRDD<String> list2RDD = sc.parallelize(list2);
list1RDD.cartesian(list2RDD).foreach(new VoidFunction<Tuple2<Integer, String>>() {
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1+"<==>"+t._2);
}
});
}
//去重
public static void distinct(){
SparkConf conf = new SparkConf().setAppName("distinct").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> list = Arrays.asList("hadoop", "hadoop", "hadoop", "spark", "spark","hbase");
JavaRDD<String> listRDD = sc.parallelize(list);
JavaRDD<String> distinct = listRDD.distinct();
distinct.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
}

//求交集
public static void intersection(){
SparkConf conf = new SparkConf().setAppName("intersection").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

final List<String> list1 = Arrays.asList("hadoop", "spark", "hbase","hive");

final List<String> list2 = Arrays.asList("hbase", "hive", "zookeeper");

JavaRDD<String> list1RDD = sc.parallelize(list1);
JavaRDD<String> list2RDD = sc.parallelize(list2);

JavaRDD<String> intersection = list1RDD.intersection(list2RDD);
intersection.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
}
//求并集
public static void union(){
SparkConf conf = new SparkConf().setAppName("union").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> list1 = Arrays.asList("hadoop", "spark", "hbase","hive");

List<String> list2 = Arrays.asList("hive", "sqoop", "akka");

JavaRDD<String> list1RDD = sc.parallelize(list1);
JavaRDD<String> list2RDD = sc.parallelize(list2);

JavaRDD<String> union = list1RDD.union(list2RDD);
union.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});


}

public static void join(){
SparkConf conf = new SparkConf().setAppName("join").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> list1 = Arrays.asList(
new Tuple2<Integer, String>(11, "mayun"),
new Tuple2<Integer, String>(22, "mahuateng"),
new Tuple2<Integer, String>(33, "zll"));

List<Tuple2<Integer, String>> list2 = Arrays.asList(
new Tuple2<Integer, String>(11, "alibaba"),
new Tuple2<Integer, String>(22, "tenxun"),
new Tuple2<Integer, String>(33, "zsjituan")
);
JavaPairRDD<Integer, String> list1RDD = sc.parallelizePairs(list1);
JavaPairRDD<Integer, String> list2RDD = sc.parallelizePairs(list2);

JavaPairRDD<Integer, Tuple2<String, String>> joinRDD = list1RDD.join(list2RDD);
joinRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, String>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, String>> t) throws Exception {
System.out.println(t._1+"<==>"+t._2._1+"<==>"+t._2._2);
}
});


}

public static void sortByKey(){
final SparkConf conf = new SparkConf().setAppName("sortBykey").setMaster("local");

final JavaSparkContext sc = new JavaSparkContext(conf);

final List<Tuple2<Integer, String>> list = Arrays.asList(
new Tuple2<Integer, String>(77, "hadoop"),
new Tuple2<Integer, String>(44, "spark"),
new Tuple2<Integer, String>(55, "hive"),
new Tuple2<Integer, String>(66, "hbase")
);

JavaRDD<Tuple2<Integer, String>> listRDD = sc.parallelize(list);
listRDD.mapToPair(new PairFunction<Tuple2<Integer,String>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<Integer, String> t) throws Exception {
return new Tuple2<Integer,String>(t._1,t._2);
}
}).sortByKey().foreach(new VoidFunction<Tuple2<Integer, String>>() {
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._2+"<--->"+t._1);
}
});
sc.stop();
}

public static void reduceByKey(){
final SparkConf conf = new SparkConf().setAppName("reduceByKey").setMaster("local");
final JavaSparkContext sc = new JavaSparkContext(conf);

final List<Tuple2<String, Integer>> list = Arrays.asList(
new Tuple2<String, Integer>("hadoop", 90),
new Tuple2<String, Integer>("spark", 80),
new Tuple2<String, Integer>("hbase", 85),
new Tuple2<String, Integer>("hive", 82)
);

final JavaRDD<Tuple2<String, Integer>> listRDD = sc.parallelize(list);
listRDD.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
return new Tuple2<String,Integer>(t._1,t._2);
}
}).reduceByKey(new org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1+i2;
}
}).foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1+"==>"+t._2);
}
});
sc.stop();
}

public static void groupByKey(){
SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, String>> list = Arrays.asList(
new Tuple2<String, String>("hadoop", "had"),
new Tuple2<String, String>("spark", "spa"),
new Tuple2<String, String>("hbase", "hba"),
new Tuple2<String, String>("hive", "hiv")
);
JavaRDD<Tuple2<String, String>> listRDD = sc.parallelize(list);
listRDD.groupBy(new org.apache.spark.api.java.function.Function<Tuple2<String,String>, String>() {
@Override
public String call(Tuple2<String, String> t) throws Exception {
return t._1;
}
}).foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<String, String>>>>() {
@Override
public void call(Tuple2<String, Iterable<Tuple2<String, String>>> t) throws Exception {
String menpai=t._1;
Iterator<Tuple2<String, String>> iterator = t._2.iterator();
System.out.println(menpai);
while(iterator.hasNext()){
Tuple2<String, String> ren = iterator.next();
//System.out.println(ren.toString());
System.out.println( ren._2);
}

}
});
sc.stop();
}
public static void flatMap(){
SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> list = Arrays.asList("you,jump", "i,jump");
JavaRDD<String> listRDD = sc.parallelize(list);
listRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(",")).iterator();
}
}).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
sc.stop();
}
//过滤奇数偶数
public static void filter(){
final SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
final JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> listRDD = sc.parallelize(list);
listRDD.filter(new org.apache.spark.api.java.function.Function<Integer, Boolean>() {
@Override
public Boolean call(Integer i) throws Exception {
return i % 2==0;
}
}).foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer t) throws Exception {
System.out.println(t+"\t");
}
});
sc.stop();
}
public static void map() {
final SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
final JavaSparkContext sc = new JavaSparkContext(conf);
List<String> list = new ArrayList<String>();
list.add("hadoop");
list.add("spark");
list.add("hbase");
JavaRDD<String> listRDD = sc.parallelize(list);
listRDD.map(new org.apache.spark.api.java.function.Function<String, Tuple2<String,Integer>>() {

@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1993);
}
}).foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1+"\t"+t._2);
}
});
sc.stop();

}
}