Dataflow管道处理完所有数据后执行操作

时间:2021-07-19 15:35:10

Is it possible to perform an action once a batch Dataflow job has finished processing all data? Specifically, I'd like to move the text file that the pipeline just processed to a different GCS bucket. I'm not sure where to place that in my pipeline to ensure it executes once after the data processing has completed.

批处理数据流作业处理完所有数据后,是否可以执行操作?具体来说,我想将刚刚处理的管道文本文件移动到不同的GCS存储桶。我不确定在我的管道中放置它以确保它在数据处理完成后执行一次。

2 个解决方案

#1


3  

I don't see why you need to do this post pipeline execution. You could use side outputs to write the file to multiple buckets, and save yourself the copy after the pipeline finishes.

我不明白为什么你需要在管道执行后做这个。您可以使用侧输出将文件写入多个存储桶,并在管道完成后自行保存副本。

If that's not going to work for you (for whatever reason), then you can simply run your pipeline in blocking execution mode i.e. use pipeline.run().waitUntilFinish(), and then just write the rest of your code (which does the copy) after that.

如果这对你不起作用(无论出于何种原因),那么你可以简单地在阻塞执行模式下运行你的管道,即使用pipeline.run()。waitUntilFinish(),然后只需编写你的其余代码(复制)之后。

[..]
/do some stuff before the pipeline runs
Pipeline pipeline = ...
pipeline.run().waitUntilFinish();
//do something after the pipeline finishes here
[..]

#2


-1  

I think two options can help you here:

我认为有两个选项可以帮到你:

1) Use TextIO to write to the bucket or folder you want, specifying the exact GCS path (for e.g. gs://sandbox/other-bucket)

1)使用TextIO写入所需的存储桶或文件夹,指定确切的GCS路径(例如gs:// sandbox / other-bucket)

2) Use Object Change Notifications in combination with Cloud Functions. You can find a good primer on doing this here and the SDK for GCS in JS here. What you will do in this option is basically setting up a trigger when something drops in a certain bucket, and move it to another one using your self-written Cloud Function.

2)将对象更改通知与云功能结合使用。你可以在这里找到一个很好的入门知识和JS中的GCS SDK。您将在此选项中执行的操作基本上是在某个存储桶中出现问题时设置触发器,然后使用自编写的云功能将其移动到另一个存储桶中。

#1


3  

I don't see why you need to do this post pipeline execution. You could use side outputs to write the file to multiple buckets, and save yourself the copy after the pipeline finishes.

我不明白为什么你需要在管道执行后做这个。您可以使用侧输出将文件写入多个存储桶,并在管道完成后自行保存副本。

If that's not going to work for you (for whatever reason), then you can simply run your pipeline in blocking execution mode i.e. use pipeline.run().waitUntilFinish(), and then just write the rest of your code (which does the copy) after that.

如果这对你不起作用(无论出于何种原因),那么你可以简单地在阻塞执行模式下运行你的管道,即使用pipeline.run()。waitUntilFinish(),然后只需编写你的其余代码(复制)之后。

[..]
/do some stuff before the pipeline runs
Pipeline pipeline = ...
pipeline.run().waitUntilFinish();
//do something after the pipeline finishes here
[..]

#2


-1  

I think two options can help you here:

我认为有两个选项可以帮到你:

1) Use TextIO to write to the bucket or folder you want, specifying the exact GCS path (for e.g. gs://sandbox/other-bucket)

1)使用TextIO写入所需的存储桶或文件夹,指定确切的GCS路径(例如gs:// sandbox / other-bucket)

2) Use Object Change Notifications in combination with Cloud Functions. You can find a good primer on doing this here and the SDK for GCS in JS here. What you will do in this option is basically setting up a trigger when something drops in a certain bucket, and move it to another one using your self-written Cloud Function.

2)将对象更改通知与云功能结合使用。你可以在这里找到一个很好的入门知识和JS中的GCS SDK。您将在此选项中执行的操作基本上是在某个存储桶中出现问题时设置触发器,然后使用自编写的云功能将其移动到另一个存储桶中。