I have this generic method in Scala
我在Scala中有这个通用方法
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S],
Optional[S]]) : JavaPairDStream[K, S] = { ... }
When I call it in Java, both of these does not compile:
当我用Java调用它时,这两个都不会编译:
1
JavaPairDStream<String, Integer> stateDstream =
pairs.<Integer>updateStateByKey(...);
2
JavaPairDStream<String, Integer> stateDstream =
pairs.updateStateByKey(...);
How do I invoke the method correctly?
如何正确调用方法?
Error messages:
错误消息:
The method updateStateByKey(Function2<List<Integer>,Optional<S>,Optional<S>>,
int) in the type JavaPairDStream<String,Integer> is not applicable for
the arguments
(Function2<List<Integer>,Optional<Integer>,Optional<Integer>>,
HashPartitioner, JavaPairRDD<String,Integer>)
Edited: The whole function call (Java 8):
编辑:整个函数调用(Java 8):
final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
(values, state) -> {
Integer newSum = state.or(0);
for (Integer value : values) {
newSum += value;
}
return Optional.of(newSum);
};
JavaPairDStream<String, Integer> stateDstream = pairs.updateStateByKey(
updateFunction
,
new HashPartitioner(context.defaultParallelism()), initialRDD);
Edited: It turned out that generics is not the issue, but the parameters do not match the method signature.
编辑:结果发现泛型不是问题,但是参数与方法签名不匹配。
1 个解决方案
#1
1
The problem is that you are passing in an initialRDD
, while the method updateStateByKey
does not have that as a parameter.
问题是您正在传入initialRDD,而方法updateStateByKey并没有这个参数。
The closest signature is:
最近的签名是:
updateStateByKey[S](updateFunc: Function2[List[V], Optional[S], Optional[S]],
partitioner: Partitioner): JavaPairDStream[K, S]
#1
1
The problem is that you are passing in an initialRDD
, while the method updateStateByKey
does not have that as a parameter.
问题是您正在传入initialRDD,而方法updateStateByKey并没有这个参数。
The closest signature is:
最近的签名是:
updateStateByKey[S](updateFunc: Function2[List[V], Optional[S], Optional[S]],
partitioner: Partitioner): JavaPairDStream[K, S]