My streaming dataflow job(2017-09-08_03_55_43-9675407418829265662
) using Apache Beam SDK for Java 2.1.0
will not scale past 1 Worker even with a growing pubsub queue (now 100k Undelivered messages) – do you have any ideas why?
使用Apache Beam SDK for Java 2.1.0的我的流数据流作业(2017-09-08_03_55_43-9675407418829265662)即使有不断增长的pubsub队列(现在是100k Undelivered消息)也不会扩展到1个工作者 - 你有什么想法吗?
Its currently running with autoscalingAlgorithm=THROUGHPUT_BASED
and maxNumWorkers=10
.
它当前运行的是autoscalingAlgorithm = THROUGHPUT_BASED和maxNumWorkers = 10。
2 个解决方案
#1
1
Dataflow Engineer here. I looked up the job in our backend and I can see that it is not scaling up because CPU utilization is low, meaning something else is limiting the performance of the pipeline, such as external throttling. Upscaling rarely helps in these cases.
数据流工程师在这里。我查看了后端的工作,我可以看到它没有扩展,因为CPU利用率很低,这意味着其他东西限制了管道的性能,例如外部限制。在这些情况下,升级很少有帮助。
I see that some bundles are taking up to hours to process. I recommend investigating your pipeline logic and see if there are other parts that can be optimized.
我看到一些捆绑包需要花费数小时才能完成。我建议调查您的管道逻辑,看看是否还有其他可以优化的部分。
#2
0
This is what I ended up with:
这就是我最终的结果:
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;
public class ReshuffleWithRandomKey<T>
extends PTransform<PCollection<T>, PCollection<T>> {
private final int size;
public ReshuffleWithRandomKey(int size) {
this.size = size;
}
@Override
public PCollection<T> expand(PCollection<T> input) {
return input
.apply("Random key", ParDo.of(new AssignRandomKeyFn<T>(size)))
.apply("Reshuffle", Reshuffle.<Integer, T>of())
.apply("Values", Values.<T>create());
}
private static class AssignRandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
private final int size;
AssignRandomKeyFn(int size) {
this.size = size;
}
@ProcessElement
public void process(ProcessContext c) {
c.output(KV.of(ThreadLocalRandom.current().nextInt(0, size), c.element()));
}
}
}
What do you think @raghu-angadi and @scott-wegner?
你觉得@ raghu-angadi和@ scott-wegner怎么样?
#1
1
Dataflow Engineer here. I looked up the job in our backend and I can see that it is not scaling up because CPU utilization is low, meaning something else is limiting the performance of the pipeline, such as external throttling. Upscaling rarely helps in these cases.
数据流工程师在这里。我查看了后端的工作,我可以看到它没有扩展,因为CPU利用率很低,这意味着其他东西限制了管道的性能,例如外部限制。在这些情况下,升级很少有帮助。
I see that some bundles are taking up to hours to process. I recommend investigating your pipeline logic and see if there are other parts that can be optimized.
我看到一些捆绑包需要花费数小时才能完成。我建议调查您的管道逻辑,看看是否还有其他可以优化的部分。
#2
0
This is what I ended up with:
这就是我最终的结果:
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;
public class ReshuffleWithRandomKey<T>
extends PTransform<PCollection<T>, PCollection<T>> {
private final int size;
public ReshuffleWithRandomKey(int size) {
this.size = size;
}
@Override
public PCollection<T> expand(PCollection<T> input) {
return input
.apply("Random key", ParDo.of(new AssignRandomKeyFn<T>(size)))
.apply("Reshuffle", Reshuffle.<Integer, T>of())
.apply("Values", Values.<T>create());
}
private static class AssignRandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
private final int size;
AssignRandomKeyFn(int size) {
this.size = size;
}
@ProcessElement
public void process(ProcessContext c) {
c.output(KV.of(ThreadLocalRandom.current().nextInt(0, size), c.element()));
}
}
}
What do you think @raghu-angadi and @scott-wegner?
你觉得@ raghu-angadi和@ scott-wegner怎么样?