Accumulator implements of JavaSparkContext in Spark1.x

时间:2022-05-04 15:18:15

As we all know , up to Spark 1.6.2, JavaSparkContext only provides two kinds of accumulators: Integer and Double.

However, unfortunately I've met with problems of Integer overflow and the program returned me a negative number.

Accumulator<Long> implements of JavaSparkContext in Spark1.x

So I have to use original sparkcontext to implement the Long accumulator.

public static class LongAccumulatorParam implements AccumulatorParam<Long>,Serializable {
@Override
public Long addAccumulator(final Long r, final Long t) {
return r + t;
}
@Override
public Long addInPlace(final Long r1, final Long r2) {
return r1 + r2;
}
@Override
public Long zero(final Long initialValue) {
return 0L;
}
}
final Accumulator<Long> acc = jsc.sc().accumulator(new Long(0), new LongAccumulatorParam());

Actually it is pretty simple. I haven't looked into Spark 2 yet, hope the developers have fixed this issue.