As we all know , up to Spark 1.6.2, JavaSparkContext only provides two kinds of accumulators: Integer and Double.
However, unfortunately I've met with problems of Integer overflow and the program returned me a negative number.
So I have to use original sparkcontext to implement the Long accumulator.
public static class LongAccumulatorParam implements AccumulatorParam<Long>,Serializable {
@Override
public Long addAccumulator(final Long r, final Long t) {
return r + t;
}
@Override
public Long addInPlace(final Long r1, final Long r2) {
return r1 + r2;
}
@Override
public Long zero(final Long initialValue) {
return 0L;
}
}
final Accumulator<Long> acc = jsc.sc().accumulator(new Long(0), new LongAccumulatorParam());
Actually it is pretty simple. I haven't looked into Spark 2 yet, hope the developers have fixed this issue.