package com.sparkproject.abc; import org.apache.spark.AccumulatorParam; public class UDFAccumulatorClass implements AccumulatorParam<String> { private static final long serialVersionUID = 1L; @Override public String addAccumulator(String v1, String v2) { return add(v1, v2); } @Override public String addInPlace(String r1, String r2) { return add(r1, r2); } @Override public String zero(String initialValue) { return "Accumulator:0"; } public String add(String v1, String v2) { if(v1 == null || "".equals(v1)) { return v2; } String[] val = v1.split(":"); return val[0] + ":" + val[1] + v2; } }
package com.sparkproject.abc; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; public class UDFAccumulator { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("Accumu"); JavaSparkContext sc = new JavaSparkContext(conf); final Accumulator<String> udf = sc.accumulator("",new UDFAccumulatorClass()); JavaRDD<Integer> javaRDD = sc.parallelize(Arrays.asList(1,2,3,4,5)); JavaPairRDD<Integer, Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { udf.add(integer + ""); return new Tuple2<Integer, Integer>(integer, integer); } }); javaPairRDD.foreach(new VoidFunction<Tuple2<Integer, Integer>>() { @Override public void call(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception { System.out.println(integerIntegerTuple2); } }); System.out.println(udf.value()); } }