Google Dataflow中的管道初始化步骤

时间:2021-09-13 15:37:20

I need to clear a table before the pipeline gets its input data and I'd like this step to run as part of the pipeline itself, in the cloud, not locally.

我需要在管道获取其输入数据之前清除一个表,并且我希望此步骤作为管道本身的一部分在云中运行,而不是在本地运行。

This is what the code looks like at the moment and clearTable() runs locally:

这就是代码目前的样子,clearTable()在本地运行:

    exactTargetIntegration.clearTable(); // runs locally
    Pipeline p = Pipeline.create(options);
    PCollection<String> readFromFile =
        p.apply(TextIO.Read.from(INPUT_FILES)); // runs in the cloud
    ...

Is it possible?

可能吗?

1 个解决方案

#1


1  

There is not currently a way to ensure that some action takes place before a read within the same pipeline. If you need your operation to run in the cloud, you can run it as a separate pipeline that runs before the first.

目前没有办法确保在同一管道中读取之前发生某些操作。如果您需要在云中运行您的操作,则可以将其作为在第一个之前运行的单独管道运行。

For example:

例如:

DataflowPipelineOptions options = ...
options.setRunner(BlockingDataflowPipelineRunner.class);
Pipeline deletePipeline = <build deletion pipeline>
Pipeline mainPipeline = <build main pipeline>
deletePipeline.run(options);
mainPipeline.run(options);

Additionally, this use case is definitely something that we'd like to support; you can track the issue here: https://issues.apache.org/jira/browse/BEAM-65

此外,这个用例肯定是我们想要支持的东西;您可以在此处跟踪问题:https://issues.apache.org/jira/browse/BEAM-65

#1


1  

There is not currently a way to ensure that some action takes place before a read within the same pipeline. If you need your operation to run in the cloud, you can run it as a separate pipeline that runs before the first.

目前没有办法确保在同一管道中读取之前发生某些操作。如果您需要在云中运行您的操作,则可以将其作为在第一个之前运行的单独管道运行。

For example:

例如:

DataflowPipelineOptions options = ...
options.setRunner(BlockingDataflowPipelineRunner.class);
Pipeline deletePipeline = <build deletion pipeline>
Pipeline mainPipeline = <build main pipeline>
deletePipeline.run(options);
mainPipeline.run(options);

Additionally, this use case is definitely something that we'd like to support; you can track the issue here: https://issues.apache.org/jira/browse/BEAM-65

此外,这个用例肯定是我们想要支持的东西;您可以在此处跟踪问题:https://issues.apache.org/jira/browse/BEAM-65