I am running DataFlow job in Google cloud platform using Apache-beam-2.3.0. Each dataFlow job has 5 steps. I want to track time taken to complete each steps in a job using java SDK
我正在使用Apache-beam-2.3.0在Google云平台上运行DataFlow作业。每个dataFlow作业都有5个步骤。我想跟踪使用java SDK完成作业中每个步骤所花费的时间
Pipeline pipeline = Pipeline.create(options);
for(int i=0; i<5; i++) {
PCollection<String> csv = pipeline.apply(transform1);
csv.apply(transform2);
}
pipeline.run().waitUntilFinish();
How to measure the time taken to complete each step in a job using PipelineResult
如何使用PipelineResult测量完成作业中每个步骤所需的时间
1 个解决方案
#1
1
You can use queryMetrics with PipelineResult to see metrics at the step level. For example:
您可以将queryMetrics与PipelineResult一起使用,以查看步骤级别的指标。例如:
Pipeline p = ...;
p.apply("create1", Create.of("hello")).apply("myStepName1", ParDo.of(new SomeDoFn()));
p.apply("create2", Create.of("world")).apply("myStepName2", ParDo.of(new SomeDoFn()));
PipelineResult result = p.run();
MetricResults metrics = result.metrics();
MetricQueryResults metricResults = metrics.queryMetrics(new MetricsFilter.Builder()
.addNameFilter("my-counter")
.addStepFilter("myStepName1").addStepFilter("myStepName2")
.build());
Iterable<MetricResult<Long>> counters = metricResults.counters();
// counters should contain the value of my-counter reported from each of the ParDo
// applications.
In this case, instead of a counter you could define a distribution metric as explained here. Some examples in this link.
在这种情况下,您可以定义分配指标而不是计数器,如此处所述。此链接中的一些示例。
#1
1
You can use queryMetrics with PipelineResult to see metrics at the step level. For example:
您可以将queryMetrics与PipelineResult一起使用,以查看步骤级别的指标。例如:
Pipeline p = ...;
p.apply("create1", Create.of("hello")).apply("myStepName1", ParDo.of(new SomeDoFn()));
p.apply("create2", Create.of("world")).apply("myStepName2", ParDo.of(new SomeDoFn()));
PipelineResult result = p.run();
MetricResults metrics = result.metrics();
MetricQueryResults metricResults = metrics.queryMetrics(new MetricsFilter.Builder()
.addNameFilter("my-counter")
.addStepFilter("myStepName1").addStepFilter("myStepName2")
.build());
Iterable<MetricResult<Long>> counters = metricResults.counters();
// counters should contain the value of my-counter reported from each of the ParDo
// applications.
In this case, instead of a counter you could define a distribution metric as explained here. Some examples in this link.
在这种情况下,您可以定义分配指标而不是计数器,如此处所述。此链接中的一些示例。