如何使用java sdk查找Google DataFlow中每个步骤的总执行时间

时间:2021-04-04 15:46:07

I am running DataFlow job in Google cloud platform using Apache-beam-2.3.0. Each dataFlow job has 5 steps. I want to track time taken to complete each steps in a job using java SDK

我正在使用Apache-beam-2.3.0在Google云平台上运行DataFlow作业。每个dataFlow作业都有5个步骤。我想跟踪使用java SDK完成作业中每个步骤所花费的时间

Pipeline pipeline = Pipeline.create(options);

for(int i=0; i<5; i++) {
PCollection<String> csv = pipeline.apply(transform1);
csv.apply(transform2);
}

pipeline.run().waitUntilFinish();

How to measure the time taken to complete each step in a job using PipelineResult

如何使用PipelineResult测量完成作业中每个步骤所需的时间

1 个解决方案

#1


1  

You can use queryMetrics with PipelineResult to see metrics at the step level. For example:

您可以将queryMetrics与PipelineResult一起使用,以查看步骤级别的指标。例如:

Pipeline p = ...;
 p.apply("create1", Create.of("hello")).apply("myStepName1", ParDo.of(new SomeDoFn()));
 p.apply("create2", Create.of("world")).apply("myStepName2", ParDo.of(new SomeDoFn()));
 PipelineResult result = p.run();
 MetricResults metrics = result.metrics();
 MetricQueryResults metricResults = metrics.queryMetrics(new MetricsFilter.Builder()
     .addNameFilter("my-counter")
     .addStepFilter("myStepName1").addStepFilter("myStepName2")
     .build());
 Iterable<MetricResult<Long>> counters = metricResults.counters();
 // counters should contain the value of my-counter reported from each of the ParDo
 // applications.

In this case, instead of a counter you could define a distribution metric as explained here. Some examples in this link.

在这种情况下,您可以定义分配指标而不是计数器,如此处所述。此链接中的一些示例。

#1


1  

You can use queryMetrics with PipelineResult to see metrics at the step level. For example:

您可以将queryMetrics与PipelineResult一起使用,以查看步骤级别的指标。例如:

Pipeline p = ...;
 p.apply("create1", Create.of("hello")).apply("myStepName1", ParDo.of(new SomeDoFn()));
 p.apply("create2", Create.of("world")).apply("myStepName2", ParDo.of(new SomeDoFn()));
 PipelineResult result = p.run();
 MetricResults metrics = result.metrics();
 MetricQueryResults metricResults = metrics.queryMetrics(new MetricsFilter.Builder()
     .addNameFilter("my-counter")
     .addStepFilter("myStepName1").addStepFilter("myStepName2")
     .build());
 Iterable<MetricResult<Long>> counters = metricResults.counters();
 // counters should contain the value of my-counter reported from each of the ParDo
 // applications.

In this case, instead of a counter you could define a distribution metric as explained here. Some examples in this link.

在这种情况下,您可以定义分配指标而不是计数器,如此处所述。此链接中的一些示例。