在spark中如何手动将任务平均分配到不同的partition中啊

时间:2022-09-29 11:15:18
各位大牛们,我现在有这样一个问题,我从redis中取数据,redis中的数据是安partition存储的,我需要让不同patition中的数据在各自的partition中分别处理,应该如何做啊。应该如何处理呢?

3 个解决方案

#1


你可以在foreachPartition算子内进行判断,然后根据条件执行指定的方法。
例如

rdd.foreachPartition(new VoidFunction<Iterator<String>>() {

                    @Override
                    public void call(Iterator<String> it) throws Exception {
                        if( xxxxx ) { // 条件。比如遍历该分区的数据去取某个特征
                            new RealForeachPartitionFunc1().call(it);  // 执行真正的foreachPartition算子
                        } else {
                            new RealForeachPartitionFunc2().call(it); 
                        }
                   }
});

#2


自定义分区,rdd.partitionBy(/*自定义的分区*/new TestPartitioner()).foreachPartition {/*对分区内数据的操作代码*/}

class TestPartitioner extends Partitioner {

  //redis分区个数
  override def numPartitions: Int = ???

  override def getPartition(key: Any): Int = {
    /*redis中的分区规则*/
  }
}

#3


自定义分区 在spark中如何手动将任务平均分配到不同的partition中啊

#1


你可以在foreachPartition算子内进行判断,然后根据条件执行指定的方法。
例如

rdd.foreachPartition(new VoidFunction<Iterator<String>>() {

                    @Override
                    public void call(Iterator<String> it) throws Exception {
                        if( xxxxx ) { // 条件。比如遍历该分区的数据去取某个特征
                            new RealForeachPartitionFunc1().call(it);  // 执行真正的foreachPartition算子
                        } else {
                            new RealForeachPartitionFunc2().call(it); 
                        }
                   }
});

#2


自定义分区,rdd.partitionBy(/*自定义的分区*/new TestPartitioner()).foreachPartition {/*对分区内数据的操作代码*/}

class TestPartitioner extends Partitioner {

  //redis分区个数
  override def numPartitions: Int = ???

  override def getPartition(key: Any): Int = {
    /*redis中的分区规则*/
  }
}

#3


自定义分区 在spark中如何手动将任务平均分配到不同的partition中啊