3 个解决方案
#1
你可以在foreachPartition算子内进行判断,然后根据条件执行指定的方法。
例如
例如
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
public void call(Iterator<String> it) throws Exception {
if( xxxxx ) { // 条件。比如遍历该分区的数据去取某个特征
new RealForeachPartitionFunc1().call(it); // 执行真正的foreachPartition算子
} else {
new RealForeachPartitionFunc2().call(it);
}
}
});
#2
自定义分区,rdd.partitionBy(/*自定义的分区*/new TestPartitioner()).foreachPartition {/*对分区内数据的操作代码*/}
class TestPartitioner extends Partitioner {
//redis分区个数
override def numPartitions: Int = ???
override def getPartition(key: Any): Int = {
/*redis中的分区规则*/
}
}
class TestPartitioner extends Partitioner {
//redis分区个数
override def numPartitions: Int = ???
override def getPartition(key: Any): Int = {
/*redis中的分区规则*/
}
}
#3
自定义分区
#1
你可以在foreachPartition算子内进行判断,然后根据条件执行指定的方法。
例如
例如
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
public void call(Iterator<String> it) throws Exception {
if( xxxxx ) { // 条件。比如遍历该分区的数据去取某个特征
new RealForeachPartitionFunc1().call(it); // 执行真正的foreachPartition算子
} else {
new RealForeachPartitionFunc2().call(it);
}
}
});
#2
自定义分区,rdd.partitionBy(/*自定义的分区*/new TestPartitioner()).foreachPartition {/*对分区内数据的操作代码*/}
class TestPartitioner extends Partitioner {
//redis分区个数
override def numPartitions: Int = ???
override def getPartition(key: Any): Int = {
/*redis中的分区规则*/
}
}
class TestPartitioner extends Partitioner {
//redis分区个数
override def numPartitions: Int = ???
override def getPartition(key: Any): Int = {
/*redis中的分区规则*/
}
}
#3
自定义分区