如何使用hadoop自定义输入格式调整Spark应用程序

时间:2020-12-06 23:11:07

My spark application process the files (average size is 20 MB) with custom hadoop input format and stores the result in HDFS.

我的spark应用程序使用自定义hadoop输入格式处理文件(平均大小为20 MB)并将结果存储在HDFS中。

Following is the code snippet.

以下是代码段。

Configuration conf = new Configuration();


JavaPairRDD<Text, Text> baseRDD = ctx
    .newAPIHadoopFile(input, CustomInputFormat.class,Text.class, Text.class, conf);

JavaRDD<myClass> mapPartitionsRDD = baseRDD
    .mapPartitions(new FlatMapFunction<Iterator<Tuple2<Text, Text>>, myClass>() {
        //my logic goes here
    }

//few more translformations
result.saveAsTextFile(path);

This application creates 1 task/ partition per file and processes and stores the corresponding part file in HDFS.

此应用程序为每个文件创建1个任务/分区,并在HDFS中处理和存储相应的部分文件。

i.e, For 10,000 input files 10,000 tasks are created and 10,000 part files are stored in HDFS.

即,对于10,000个输入文件,将创建10,000个任务,并将10,000个部分文件存储在HDFS中。

Both mapPartitions and map operations on baseRDD are creating 1 task per file.

baseRDD上的mapPartitions和map操作都为每个文件创建了1个任务。

SO question How to set the number of partitions for newAPIHadoopFile? suggests to set conf.setInt("mapred.max.split.size", 4); for configuring no of partitions.

那么问题如何设置newAPIHadoopFile的分区数?建议设置conf.setInt(“mapred.max.split.size”,4);用于配置no分区。

But when this parameter is set CPU is utilized at maximum and none of the stage is not started even after long time.

但是当设置此参数时,CPU被最大限度地利用,并且即使在很长时间之后也没有开始阶段。

If I don't set this parameter then application will be completed successfully as mentioned above.

如果我没有设置此参数,则应用程序将如上所述成功完成。

How to set number of partitions with newAPIHadoopFile and increase the efficiency?

如何使用newAPIHadoopFile设置分区数并提高效率?

What happens with mapred.max.split.size option?

mapred.max.split.size选项会发生什么?

============

update: What happens with mapred.max.split.size option?

更新:mapred.max.split.size选项会发生什么?

In my use case file size is small and changing the split size options are irrelevant here.

在我的用例中,文件大小很小,更改分割大小选项在这里是无关紧要的。

more info on this SO: Behavior of the parameter "mapred.min.split.size" in HDFS

关于此SO的更多信息:HDFS中参数“mapred.min.split.size”的行为

1 个解决方案

#1


Just use baseRDD.repartition(<a sane amount>).mapPartitions(...). That will move the resulting operation to fewer partitions, especially if your files are small.

只需使用baseRDD.repartition().mapPartitions(...)。这会将生成的操作移动到更少的分区,尤其是在文件很小的情况下。

#1


Just use baseRDD.repartition(<a sane amount>).mapPartitions(...). That will move the resulting operation to fewer partitions, especially if your files are small.

只需使用baseRDD.repartition().mapPartitions(...)。这会将生成的操作移动到更少的分区,尤其是在文件很小的情况下。