I am trying to process json files in a bucket and write the results into a bucket:
我正在尝试处理存储桶中的json文件并将结果写入存储桶:
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("the-project");
options.setStagingLocation("gs://some-bucket/temp/");
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://some-bucket/2016/04/28/*/*.json"))
.apply(ParDo.named("SanitizeJson").of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
try {
JsonFactory factory = JacksonFactory.getDefaultInstance();
String json = c.element();
SomeClass e = factory.fromString(json, SomeClass.class);
// manipulate the object a bit...
c.output(factory.toString(e));
} catch (Exception err) {
LOG.error("Failed to process element: " + c.element(), err);
}
}
}))
.apply(TextIO.Write.to("gs://some-bucket/output/"));
p.run();
I have around 50,000 files under the path gs://some-bucket/2016/04/28/ (in sub-directories). My question is: does it make sense that this takes more than an hour to complete? Doing something similar on a Spark cluster in amazon takes about 15-20 minutes. I suspect that I might be doing something inefficiently.
我在路径gs:// some-bucket / 2016/04/28 /(在子目录中)下有大约50,000个文件。我的问题是:这需要一个多小时才能完成吗?在亚马逊的Spark群集上做类似的事情需要大约15-20分钟。我怀疑我可能会做一些效率低下的事情。
EDIT:
编辑:
In my Spark job I aggregate all the results in a DataFrame and only then write the output, all at once. I noticed that my pipeline here writes each file separately, I assume that is why it's taking much longer. Is there a way to change this behavior?
在我的Spark作业中,我将所有结果聚合在一个DataFrame中,然后一次性写入输出。我注意到我的管道在这里分别写了每个文件,我想这就是为什么它需要更长的时间。有没有办法改变这种行为?
1 个解决方案
#1
0
Your jobs are hitting a couple of performance issues in Dataflow, caused by the fact that it is more optimized for executing work in larger increments, while your job is processing lots of very small files. As a result, some aspects of the job's execution end up dominated by per-file overhead. Here's some details and suggestions.
您的作业在Dataflow中遇到了几个性能问题,这是因为它更适合以更大的增量执行工作,而您的工作是处理大量非常小的文件。结果,作业执行的某些方面最终由每个文件开销占主导地位。这里有一些细节和建议。
- The job is limited rather by writing output than by reading input (though reading input is also a significant part). You can significantly cut that overhead by specifying
withNumShards
on yourTextIO.Write
, depending on how many files you want in the output. E.g.100
could be a reasonable value. By default you're getting an unspecified number of files which in this case, given current behavior of the Dataflow optimizer, matches number of input files: usually it is a good idea because it allows us to not materialize the intermediate data, but in this case it's not a good idea because the input files are so small and per-file overhead is more important. - 通过写入输出而不是通过读取输入来限制作业(尽管读取输入也是重要部分)。您可以通过在TextIO.Write上指定withNumShards来显着减少开销,具体取决于您在输出中需要多少文件。例如。 100可能是合理的价值。默认情况下,你得到一个未指定数量的文件,在这种情况下,给定数据流优化器的当前行为,匹配输入文件的数量:通常这是一个好主意,因为它允许我们不实现中间数据,但在此这不是一个好主意,因为输入文件太小,每个文件的开销更重要。
- I recommend to set
maxNumWorkers
to a value like e.g. 12 - currently the second job is autoscaling to an excessively large number of workers. This is caused by Dataflow's autoscaling currently being geared toward jobs that process data in larger increments - it currently doesn't take into account per-file overhead and behaves not so well in your case. - 我建议将maxNumWorkers设置为一个值,例如12 - 目前第二项工作是自动扩展到过多的工人。这是由于Dataflow的自动调节当前面向以更大增量处理数据的作业 - 它目前没有考虑到每个文件的开销,并且在您的情况下表现不佳。
- The second job is also hitting a bug because of which it fails to finalize the written output. We're investigating, however setting
maxNumWorkers
should also make it complete successfully. - 第二个工作也遇到了一个错误,因为它无法完成书面输出。我们正在调查,但设置maxNumWorkers也应该成功完成。
To put it shortly:
简而言之:
- set
maxNumWorkers=12
- 设置maxNumWorkers = 12
- set
TextIO.Write.to("...").withNumShards(100)
- 设置TextIO.Write.to(“...”)。withNumShards(100)
and it should run much better.
它应该运行得更好。
#1
0
Your jobs are hitting a couple of performance issues in Dataflow, caused by the fact that it is more optimized for executing work in larger increments, while your job is processing lots of very small files. As a result, some aspects of the job's execution end up dominated by per-file overhead. Here's some details and suggestions.
您的作业在Dataflow中遇到了几个性能问题,这是因为它更适合以更大的增量执行工作,而您的工作是处理大量非常小的文件。结果,作业执行的某些方面最终由每个文件开销占主导地位。这里有一些细节和建议。
- The job is limited rather by writing output than by reading input (though reading input is also a significant part). You can significantly cut that overhead by specifying
withNumShards
on yourTextIO.Write
, depending on how many files you want in the output. E.g.100
could be a reasonable value. By default you're getting an unspecified number of files which in this case, given current behavior of the Dataflow optimizer, matches number of input files: usually it is a good idea because it allows us to not materialize the intermediate data, but in this case it's not a good idea because the input files are so small and per-file overhead is more important. - 通过写入输出而不是通过读取输入来限制作业(尽管读取输入也是重要部分)。您可以通过在TextIO.Write上指定withNumShards来显着减少开销,具体取决于您在输出中需要多少文件。例如。 100可能是合理的价值。默认情况下,你得到一个未指定数量的文件,在这种情况下,给定数据流优化器的当前行为,匹配输入文件的数量:通常这是一个好主意,因为它允许我们不实现中间数据,但在此这不是一个好主意,因为输入文件太小,每个文件的开销更重要。
- I recommend to set
maxNumWorkers
to a value like e.g. 12 - currently the second job is autoscaling to an excessively large number of workers. This is caused by Dataflow's autoscaling currently being geared toward jobs that process data in larger increments - it currently doesn't take into account per-file overhead and behaves not so well in your case. - 我建议将maxNumWorkers设置为一个值,例如12 - 目前第二项工作是自动扩展到过多的工人。这是由于Dataflow的自动调节当前面向以更大增量处理数据的作业 - 它目前没有考虑到每个文件的开销,并且在您的情况下表现不佳。
- The second job is also hitting a bug because of which it fails to finalize the written output. We're investigating, however setting
maxNumWorkers
should also make it complete successfully. - 第二个工作也遇到了一个错误,因为它无法完成书面输出。我们正在调查,但设置maxNumWorkers也应该成功完成。
To put it shortly:
简而言之:
- set
maxNumWorkers=12
- 设置maxNumWorkers = 12
- set
TextIO.Write.to("...").withNumShards(100)
- 设置TextIO.Write.to(“...”)。withNumShards(100)
and it should run much better.
它应该运行得更好。