How can I create my own counters in my DoFns?
如何在DoFns中创建自己的计数器?
In my DoFn I'd like to increment a counter every time a condition is met when processing a record. I'd like this counter to sum the values across all records.
在我的DoFn中,我想在处理记录时每次满足条件时递增计数器。我希望这个计数器能够对所有记录中的值求和。
1 个解决方案
#1
2
You can use Aggregators, and the total values of the counters will show up in the UI.
您可以使用聚合器,计数器的总值将显示在UI中。
Here is an example where I experimented with Aggregators in a pipeline that just sleeps numOutputShards workers for sleepSecs seconds. (The GenFakeInput PTransform at the beginning just returns a flattened PCollection<String> of size numOutputShards):
下面是一个示例,我在一个管道中试验了聚合器,它只为sleepSecs的工作人员休眠了几秒钟。 (开头的GenFakeInput PTransform只返回一个大小为numOutputShards的扁平PCollection
PCollection<String> output = p
.apply(new GenFakeInput(options.getNumOutputShards()))
.apply(ParDo.named("Sleep").of(new DoFn<String, String>() {
private Aggregator<Long> tSleepSecs;
private Aggregator<Integer> tWorkers;
private Aggregator<Long> tExecTime;
private long startTimeMillis;
@Override
public void startBundle(Context c) {
tSleepSecs = c.createAggregator("Total Slept (sec)", new Sum.SumLongFn());
tWorkers = c.createAggregator("Num Workers", new Sum.SumIntegerFn());
tExecTime = c.createAggregator("Total Wallclock (sec)", new Sum.SumLongFn());
startTimeMillis = System.currentTimeMillis();
}
@Override
public void finishBundle(Context c) {
tExecTime.addValue((System.currentTimeMillis() - startTimeMillis)/1000);
}
@Override
public void processElement(ProcessContext c) {
try {
LOG.info("Sleeping for {} seconds.", sleepSecs);
tSleepSecs.addValue(sleepSecs);
tWorkers.addValue(1);
TimeUnit.SECONDS.sleep(sleepSecs);
} catch (InterruptedException e) {
LOG.info("Ignoring caught InterruptedException during sleep.");
}
c.output(c.element());
}}));
#1
2
You can use Aggregators, and the total values of the counters will show up in the UI.
您可以使用聚合器,计数器的总值将显示在UI中。
Here is an example where I experimented with Aggregators in a pipeline that just sleeps numOutputShards workers for sleepSecs seconds. (The GenFakeInput PTransform at the beginning just returns a flattened PCollection<String> of size numOutputShards):
下面是一个示例,我在一个管道中试验了聚合器,它只为sleepSecs的工作人员休眠了几秒钟。 (开头的GenFakeInput PTransform只返回一个大小为numOutputShards的扁平PCollection
PCollection<String> output = p
.apply(new GenFakeInput(options.getNumOutputShards()))
.apply(ParDo.named("Sleep").of(new DoFn<String, String>() {
private Aggregator<Long> tSleepSecs;
private Aggregator<Integer> tWorkers;
private Aggregator<Long> tExecTime;
private long startTimeMillis;
@Override
public void startBundle(Context c) {
tSleepSecs = c.createAggregator("Total Slept (sec)", new Sum.SumLongFn());
tWorkers = c.createAggregator("Num Workers", new Sum.SumIntegerFn());
tExecTime = c.createAggregator("Total Wallclock (sec)", new Sum.SumLongFn());
startTimeMillis = System.currentTimeMillis();
}
@Override
public void finishBundle(Context c) {
tExecTime.addValue((System.currentTimeMillis() - startTimeMillis)/1000);
}
@Override
public void processElement(ProcessContext c) {
try {
LOG.info("Sleeping for {} seconds.", sleepSecs);
tSleepSecs.addValue(sleepSecs);
tWorkers.addValue(1);
TimeUnit.SECONDS.sleep(sleepSecs);
} catch (InterruptedException e) {
LOG.info("Ignoring caught InterruptedException during sleep.");
}
c.output(c.element());
}}));