自动扩展期间Google Dataflow吞吐量下降

时间:2021-10-29 15:23:29

We have a dataflow streaming job which reads from PubSub, extracts some fields and writes to bigtable. We are observing that dataflow's throughput drops when it is autoscaling. For example, if the dataflow job is currently running with 2 workers and processing at the rate of 100 messages/sec, during autoscaling this rate of 100 messages/sec drops down and some times it drops down to nearly 0 and then increases to 500 messages/sec. We are seeing this every time, dataflow upscales. This is causing higher system lag during autoscaling and bigger spikes of unacknowledged messages in pub/sub.

我们有一个数据流流媒体作业,从PubSub读取,提取一些字段并写入bigtable。我们观察到数据流的吞吐量在自动缩放时会下降。例如,如果数据流作业当前正在以2个工作程序运行并以100个消息/秒的速率进行处理,则在自动缩放期间,此消息速率为100个/秒,有时会下降到接近0,然后增加到500个消息/秒。我们每次都看到这一点,数据流量上升。这导致自动缩放期间更高的系统延迟以及pub / sub中未确认​​消息的更大峰值。

Is this the expected behavior of dataflow autoscaling or is there a way to maintain this 100 messages/sec while it autoscales and minimize the spices of unacknowledged messages? (Please Note: 100 messages/sec and 500 messages/sec are just example figures)

这是数据流自动缩放的预期行为还是有一种方法可以在自动缩放和最小化未确认消息的香料时保持这100条消息/秒? (请注意:100条消息/秒和500条消息/秒只是示例数字)

job ID: 2017-10-23_12_29_09-11538967430775949506

工作ID:2017-10-23_12_29_09-11538967430775949506

I am attaching the screen shots of pub/sub stackdriver and dataflow autoscaling.自动扩展期间Google Dataflow吞吐量下降

我附加了pub / sub stackdriver和数据流自动缩放的屏幕截图。

自动扩展期间Google Dataflow吞吐量下降

自动扩展期间Google Dataflow吞吐量下降

There is drop in number of pull requests everytime dataflow autoscales. I could not take screenshot with timestamps, but drop in pull requests matches with the time data flow autoscaling. ===========EDIT========================

每次数据流自动调整时,拉取请求的数量都会下降。我无法截取带有时间戳的屏幕截图,但是下拉请求与时间数据流自动缩放匹配。 ===========编辑========================

We are writing to GCS in parallel using below mentioned windowing.

我们使用下面提到的窗口并行写入GCS。

inputCollection.apply("Windowing",
            Window.<String>into(FixedWindows.of(ONE_MINUTE))

.triggering(AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(ONE_MINUTE))
                  .withAllowedLateness(ONE_HOUR)
                  .discardingFiredPanes()
                  )

    //Writing to GCS
                .apply(TextIO.write()
                            .withWindowedWrites()
                            .withNumShards(10)
                            .to(options.getOutputPath())
                            .withFilenamePolicy(
                                    new 
WindowedFileNames(options.getOutputPath())));

WindowedFileNames.java

WindowedFileNames.java

public class WindowedFileNames  extends FilenamePolicy implements OrangeStreamConstants{

/**
 * 
 */
private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(WindowedFileNames.class);
protected final String outputPath;
public WindowedFileNames(String outputPath) {
    this.outputPath = outputPath;
}


@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
    IntervalWindow intervalWindow = (IntervalWindow) context.getWindow();
    DateTime date = intervalWindow.maxTimestamp().toDateTime(DateTimeZone.forID("America/New_York"));

    String fileName = String.format(FOLDER_EXPR, outputPath, //"orangestreaming", 
            DAY_FORMAT.print(date), HOUR_MIN_FORMAT.print(date) + HYPHEN + context.getShardNumber());
    logger.error(fileName+"::::: File name for the current minute");
     return outputDirectory
              .getCurrentDirectory()
              .resolve(fileName, StandardResolveOptions.RESOLVE_FILE);
}

@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context, String extension) {
    return null;
}





}

1 个解决方案

#1


0  

What is actually happening is that your throughput is decreasing first, and that is the reason that workers are scaling up.

实际发生的是您的吞吐量首先下降,这就是工人扩大规模的原因。

If you look at your pipeline around 1:30am, the series of events is like so:

如果你在凌晨1:30左右看一下你的管道,一系列的事件是这样的:

  1. Around 1:23am, throughput drops. This builds up the backlog.
  2. 凌晨1点23分左右,吞吐量下降。这会积压待办事项。
  3. Around 1:28am, the pipeline unblocks and starts making progress.
  4. 大约凌晨1点28分,管道解锁并开始取得进展。
  5. Due to the large backlog, the pipeline scales up to 30 workers.
  6. 由于积压量很大,管道最多可扩展到30名工人。

Also, if you look at the autoscaling UI, the justification for going up to 30 workers is:

此外,如果您查看自动缩放UI,最多可以支持30名工作人员:

"Raised the number of workers to 30 so that the pipeline can catch up with its backlog and keep up with its input rate."

“将工人数量增加到30人,以便管道能够赶上其积压并跟上其投入率。”

Hope that helps!

希望有所帮助!

#1


0  

What is actually happening is that your throughput is decreasing first, and that is the reason that workers are scaling up.

实际发生的是您的吞吐量首先下降,这就是工人扩大规模的原因。

If you look at your pipeline around 1:30am, the series of events is like so:

如果你在凌晨1:30左右看一下你的管道,一系列的事件是这样的:

  1. Around 1:23am, throughput drops. This builds up the backlog.
  2. 凌晨1点23分左右,吞吐量下降。这会积压待办事项。
  3. Around 1:28am, the pipeline unblocks and starts making progress.
  4. 大约凌晨1点28分,管道解锁并开始取得进展。
  5. Due to the large backlog, the pipeline scales up to 30 workers.
  6. 由于积压量很大,管道最多可扩展到30名工人。

Also, if you look at the autoscaling UI, the justification for going up to 30 workers is:

此外,如果您查看自动缩放UI,最多可以支持30名工作人员:

"Raised the number of workers to 30 so that the pipeline can catch up with its backlog and keep up with its input rate."

“将工人数量增加到30人,以便管道能够赶上其积压并跟上其投入率。”

Hope that helps!

希望有所帮助!