使用TextIO和ValueProvider创建数据流模板时出错

时间:2021-08-11 15:22:14

I am trying to create a google dataflow template but I can't seem to find a way to do it without producing the following exception:

我正在尝试创建一个谷歌数据流模板,但我似乎无法找到一种方法来做到这一点,而不会产生以下异常:

WARNING: Size estimation of the source failed: RuntimeValueProvider{propertyName=inputFile, default=null}
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=inputFile, default=null}
        at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:234)
        at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:218)
        at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78)
        at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
        at org.apache.beam.examples.MyMinimalWordCount.main(MyMinimalWordCount.java:69)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
        at java.lang.Thread.run(Thread.java:748)

I can reproduce it with a simple modified version of the MinimalWordCount example from Beam.

我可以使用Beam的MinimalWordCount示例的简单修改版本来重现它。

public class MyMinimalWordCount {

    public interface WordCountOptions extends PipelineOptions {
        @Description("Path of the file to read from")
        ValueProvider<String> getInputFile();

        void setInputFile(ValueProvider<String> valueProvider);
    }

    public static void main(String[] args) {

        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.class);

        Pipeline p = Pipeline.create(options);

        p.apply(TextIO.read().from(options.getInputFile()))

                .apply(FlatMapElements
                        .into(TypeDescriptors.strings())
                        .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
                .apply(Filter.by((String word) -> !word.isEmpty()))
                .apply(Count.perElement())
                .apply(MapElements
                        .into(TypeDescriptors.strings())
                        .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
                .apply(TextIO.write().to("wordcounts"));

        // Having the waitUntilFinish causes a NPE when trying to create a dataflow template
        //p.run().waitUntilFinish();

        p.run();
    }
}

I can run the example locally with:

我可以在本地运行示例:

mvn compile exec:java \
     -Pdirect-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--inputFile=pom.xml " 

It also runs on Google Dataflow with:

它还可以在Google Dataflow上运行:

mvn compile exec:java \
     -Pdataflow-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[project] \
                  --inputFile=gs://[bucket]/input.csv "

But when I try to create a Google Dataflow template with the following, I get the error:

但是当我尝试使用以下内容创建Google Dataflow模板时,我收到错误消息:

mvn compile exec:java \
     -Pdataflow-runner \
     -Dexec.mainClass=org.apache.beam.examples.MyMinimalWordCount \
     -Dexec.args="--runner=DataflowRunner \
                  --project=[project] \
                  --stagingLocation=gs://[bucket]/staging \
                  --templateLocation=gs://[bucket]/templates/MyMinimalWordCountTemplate " 

The other confusing thing is that the maven build continues and ends with BUILD SUCCESS

另一个令人困惑的事情是maven构建继续并以BUILD SUCCESS结束

So my questions are:

所以我的问题是:

Q1) Should I be able to create a Google Dataflow template like this (using ValueProviders to provide TextIO input at runtime)?

Q1)我是否应该能够像这样创建一个Google Dataflow模板(使用ValueProviders在运行时提供TextIO输入)?

Q2) Is the exception during the build a real error or just a WARNING as the logging seems to indicate?

Q2)构建过程中的异常是真正的错误还是只是警告,因为日志记录似乎表明了?

Q3) If the answers to Q1 and Q2 are yes and 'just a warning' and I try to create a job from the uploaded template why does it not have any metadata or know about my input options?

Q3)如果Q1和Q2的答案是肯定的并且“只是一个警告”,我尝试从上传的模板创建一个作业,为什么它没有任何元数据或知道我的输入选项?

使用TextIO和ValueProvider创建数据流模板时出错

References I have used:

我用过的参考资料:

2 个解决方案

#1


0  

The Correct answer is that you do not have to give an input in making the template and it should take the input as a value at the run-time. The exception is an internal issue at the Google Data-flow which should be removed in future.

正确的答案是您不必在制作模板时给出输入,它应该在运行时将输入作为值。例外是Google数据流的内部问题,将来应该删除。

#2


-1  

I believe that the --inputFiles are bundled in with template when the template is created.

我相信在创建模板时,--inputFiles会与模板捆绑在一起。

Please see note 1: "In addition to the template file, templated pipeline execution also relies on files that were staged and referenced at the time of template creation. If the staged files are moved or removed, your pipeline execution will fail."

请参阅注释1:“除模板文件外,模板化管道执行还依赖于在模板创建时暂存和引用的文件。如果移动或删除了暂存文件,则管道执行将失败。”

This thread seems relevant as well 2

这个线程似乎也很相关2

#1


0  

The Correct answer is that you do not have to give an input in making the template and it should take the input as a value at the run-time. The exception is an internal issue at the Google Data-flow which should be removed in future.

正确的答案是您不必在制作模板时给出输入,它应该在运行时将输入作为值。例外是Google数据流的内部问题,将来应该删除。

#2


-1  

I believe that the --inputFiles are bundled in with template when the template is created.

我相信在创建模板时,--inputFiles会与模板捆绑在一起。

Please see note 1: "In addition to the template file, templated pipeline execution also relies on files that were staged and referenced at the time of template creation. If the staged files are moved or removed, your pipeline execution will fail."

请参阅注释1:“除模板文件外,模板化管道执行还依赖于在模板创建时暂存和引用的文件。如果移动或删除了暂存文件,则管道执行将失败。”

This thread seems relevant as well 2

这个线程似乎也很相关2