耗尽从PubSub读取并写入Google云端存储的Dataflow作业时的数据丢失

时间:2022-11-19 15:36:21

When putting a fixed number of strings (800,000 1KB used to test) into a PubSub topic and running the following Apache Beam (2.1.0) job in Dataflow, exactly once semantics are preserved as expected.

将固定数量的字符串(800,000 1KB用于测试)放入PubSub主题并在Dataflow中运行以下Apache Beam(2.1.0)作业时,只需按预期保留一次语义。

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGsSimpleJob {

    public static void main(String[] args) {
        PubSubToGsPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(PubSubToGsPipelineOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply(PubsubIO.readStrings().fromSubscription(options.getInput()))
                .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
                .apply(TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()));
        p.run();
    }

}

PipelineOptions implementation below

下面的PipelineOptions实现

import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

public interface PubSubToGsPipelineOptions extends PipelineOptions {
    @Description("PubSub subscription")
    String getInput();
    void setInput(String input);

    @Description("Google Cloud Storage output path")
    String getOutput();
    void setOutput(String output);
}

However, if the same job is run, drained before all the elements are read (as shown in the Dataflow console), and kicked off again, the output files have fewer records than the original data set that was published in to the PubSub topic. This is suggesting that draining and replacing a job can cause data loss which seems odd since this google cloud blog post mentions that Drain and replace should have at least once semantics. How should this pipeline be designed to achieve at least once semantics (or better yet exactly once semantics) when draining and replacing the job?

但是,如果运行相同的作业,在读取所有元素之前排空(如Dataflow控制台中所示),并再次启动,则输出文件的记录数比发布到PubSub主题中的原始数据集少。这表明,排空和更换工作可能导致数据丢失,这似乎很奇怪,因为这篇谷歌云博客文章提到排水和替换至少应该有一次语义。如何在排空和更换作业时,如何设计此管道以实现至少一次语义(或更好但一次语义)?

1 个解决方案

#1


0  

My guess is that a window might be partially written before drain and replacement job overwrites it with the remaining window. You can check the worker logs from drained job and replacement job for this log line in WriteFiles. If you use Beam HEAD it also logs when it the final destination is overwritten.

我的猜测是,在排水和更换作业之前可能会部分写入窗口,并用剩余的窗口覆盖它。您可以在WriteFiles中检查此日志行的已排空作业和替换作业的工作日志。如果使用Beam HEAD,它也会在覆盖最终目标时进行记录。

Conceptually drained job and replacement jobs are entirely different pipelines. Using same output location is not different from using same output location for two other unrelated jobs. Another thing you could try is to use different output path for second job and verify that all the records exist across both directories.

概念上耗尽的工作和更换工作是完全不同的管道。使用相同的输出位置与使用相同的输出位置与其他两个不相关的作业没有什么不同。您可以尝试的另一件事是为第二个作业使用不同的输出路径,并验证两个目录中是否存在所有记录。

#1


0  

My guess is that a window might be partially written before drain and replacement job overwrites it with the remaining window. You can check the worker logs from drained job and replacement job for this log line in WriteFiles. If you use Beam HEAD it also logs when it the final destination is overwritten.

我的猜测是,在排水和更换作业之前可能会部分写入窗口,并用剩余的窗口覆盖它。您可以在WriteFiles中检查此日志行的已排空作业和替换作业的工作日志。如果使用Beam HEAD,它也会在覆盖最终目标时进行记录。

Conceptually drained job and replacement jobs are entirely different pipelines. Using same output location is not different from using same output location for two other unrelated jobs. Another thing you could try is to use different output path for second job and verify that all the records exist across both directories.

概念上耗尽的工作和更换工作是完全不同的管道。使用相同的输出位置与使用相同的输出位置与其他两个不相关的作业没有什么不同。您可以尝试的另一件事是为第二个作业使用不同的输出路径,并验证两个目录中是否存在所有记录。