是否能够使用数据流将数据从pubsub流式传输到数据存储区?

时间:2022-07-09 15:21:38

I try to stream data from pubsub to datastore using dataflow.

我尝试使用数据流将数据从pubsub流式传输到数据存储区。

reference: https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/src/main/java/com/google/cloud/teleport/templates

参考:https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/src/main/java/com/google/cloud/teleport/templates

I try to build template, but it does not work at all. So, I think that It is not possible.

我尝试构建模板,但它根本不起作用。所以,我认为这是不可能的。

How is it? Please give me some advice.

怎么样?请给我一些建议。

1 个解决方案

#1


1  

You may have stumbled on a bug in that specific template. There are two separate issues in it, first is the one answered in this SO question How to use google provided template [pubsub to Datastore]? which points to the missing errorTag and the second is that the writer to Datastore actually uses a GroupByKey when it writes the entities to the Datastore.

您可能偶然发现了该特定模板中的错误。它有两个独立的问题,首先是在这个SO问题中回答的问题如何使用谷歌提供的模板[pubsub to Datastore]?指向缺少的errorTag,第二个是数据存储区的编写者实际上使用GroupByKey将实体写入数据存储区。

If you run the maven compile command with the -e option it will show you the error message GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. Why does it do this? It has to do with the fact that messages are streamed from PubSub and not batched (which is what we would expect). That means that there is no finite set of items that are streaming in but a never ending stream of item. In order to work with that we need to limit it to a streaming window of items that can be considered by aggregation functions such as GroupByKey. The DatastoreConverters class that helps to write entities to the Datastore actually checks if we are trying to write the same key multiple times, and it does that by using the GroupByKeyfunction.

如果使用-e选项运行maven compile命令,它将显示错误消息GroupByKey无法应用于没有触发器的GlobalWindow中的无限制PCollection。在GroupByKey之前使用Window.into或Window.triggering变换。为什么这样做?这与消息从PubSub流式传输而不是批处理(这是我们期望的)这一事实有关。这意味着没有有限的项目集流入但是永无止境的项目流。为了使用它,我们需要将它限制为可以由聚合函数(如GroupByKey)考虑的项目的流式窗口。有助于将实体写入数据存储区的DatastoreConverters类实际上会检查我们是否尝试多次写入相同的密钥,并且它通过使用GroupByKeyfunction来执行此操作。

Simple solution, just give it a streaming window to work with, here is an added third .apply(...) in the pipeline that windows the stream together and allows you to use the datastore writer here:

简单的解决方案,只需给它一个流媒体窗口来使用,这里是管道中添加的第三个.apply(...),它将流窗口连在一起并允许您在这里使用数据存储区编写器:

import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;

... 

  public static void main(String[] args) {
    PubsubToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(PubsubToDatastoreOptions.class);

    Pipeline pipeline = Pipeline.create(options);
    TupleTag<String> errorTag = new TupleTag<String>("errors") {};

    pipeline
        .apply(PubsubIO.readStrings()
            .fromTopic(options.getPubsubReadTopic()))
        .apply(TransformTextViaJavascript.newBuilder()
            .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
            .setFunctionName(options.getJavascriptTextTransformFunctionName())
            .build())
        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))
        .apply(WriteJsonEntities.newBuilder()
            .setProjectId(options.getDatastoreWriteProjectId())
            .setErrorTag(errorTag)
            .build());

    pipeline.run();
  }

Now there are other, and very possibly much better, ways of doing this but this will get your template compiled and working. This examples shows a FixedWindow of 1 second, there are other options for doing this, check the documentation for that Google DataFlow - Windowing.

现在还有其他的,可能更好的方法,但这将使您的模板编译和工作。此示例显示1秒的FixedWindow,还有其他选项可以执行此操作,请查看该Google DataFlow的文档 - 窗口。

Compile your template with:

使用以下代码编译模板:

mvn compile exec:java -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToDatastore -Dexec.cleanupDaemonThreads=false -Dexec.args=" \
--project=[YOUR_PROJECTID_HERE] \
--stagingLocation=gs://[YOUR_BUCKET_HERE]/staging \
--tempLocation=gs://[YOUR_BUCKET_HERE]/temp \
--templateLocation=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--runner=DataflowRunner"

And then startup the job with:

然后用以下方式启动工作:

gcloud dataflow jobs run [NAME_OF_THE_JOB_WHATEVER_YOU_LIKE] \
--gcs-location=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--zone=[ZONE_WHERE_YOU_WANT_TO_RUN] \
--parameters "pubsubReadTopic=[YOUR_PUBSUB_TOPIC_HERE],datastoreWriteProjectId=[YOUR_PROJECTID_HERE]"

Now you should see your job running in GCP console if you look there:

现在你应该看到你的工作在GCP控制台中运行,如果你看到那里:

是否能够使用数据流将数据从pubsub流式传输到数据存储区?

Note that this specific solution and chosen window will mean a delay of up to a second for the PubSub messages to end up in the Datastore. Shortening the window may help that a little, but in order to get a higher throughput than that you would need a different pipeline than the one shown here.

请注意,此特定解决方案和所选窗口将意味着PubSub消息最终会在数据存储区中延迟一秒钟。缩短窗口可能会有所帮助,但为了获得比您需要的管道更高的吞吐量而不是此处显示的管道。

#1


1  

You may have stumbled on a bug in that specific template. There are two separate issues in it, first is the one answered in this SO question How to use google provided template [pubsub to Datastore]? which points to the missing errorTag and the second is that the writer to Datastore actually uses a GroupByKey when it writes the entities to the Datastore.

您可能偶然发现了该特定模板中的错误。它有两个独立的问题,首先是在这个SO问题中回答的问题如何使用谷歌提供的模板[pubsub to Datastore]?指向缺少的errorTag,第二个是数据存储区的编写者实际上使用GroupByKey将实体写入数据存储区。

If you run the maven compile command with the -e option it will show you the error message GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. Why does it do this? It has to do with the fact that messages are streamed from PubSub and not batched (which is what we would expect). That means that there is no finite set of items that are streaming in but a never ending stream of item. In order to work with that we need to limit it to a streaming window of items that can be considered by aggregation functions such as GroupByKey. The DatastoreConverters class that helps to write entities to the Datastore actually checks if we are trying to write the same key multiple times, and it does that by using the GroupByKeyfunction.

如果使用-e选项运行maven compile命令,它将显示错误消息GroupByKey无法应用于没有触发器的GlobalWindow中的无限制PCollection。在GroupByKey之前使用Window.into或Window.triggering变换。为什么这样做?这与消息从PubSub流式传输而不是批处理(这是我们期望的)这一事实有关。这意味着没有有限的项目集流入但是永无止境的项目流。为了使用它,我们需要将它限制为可以由聚合函数(如GroupByKey)考虑的项目的流式窗口。有助于将实体写入数据存储区的DatastoreConverters类实际上会检查我们是否尝试多次写入相同的密钥,并且它通过使用GroupByKeyfunction来执行此操作。

Simple solution, just give it a streaming window to work with, here is an added third .apply(...) in the pipeline that windows the stream together and allows you to use the datastore writer here:

简单的解决方案,只需给它一个流媒体窗口来使用,这里是管道中添加的第三个.apply(...),它将流窗口连在一起并允许您在这里使用数据存储区编写器:

import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;

... 

  public static void main(String[] args) {
    PubsubToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(PubsubToDatastoreOptions.class);

    Pipeline pipeline = Pipeline.create(options);
    TupleTag<String> errorTag = new TupleTag<String>("errors") {};

    pipeline
        .apply(PubsubIO.readStrings()
            .fromTopic(options.getPubsubReadTopic()))
        .apply(TransformTextViaJavascript.newBuilder()
            .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
            .setFunctionName(options.getJavascriptTextTransformFunctionName())
            .build())
        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))
        .apply(WriteJsonEntities.newBuilder()
            .setProjectId(options.getDatastoreWriteProjectId())
            .setErrorTag(errorTag)
            .build());

    pipeline.run();
  }

Now there are other, and very possibly much better, ways of doing this but this will get your template compiled and working. This examples shows a FixedWindow of 1 second, there are other options for doing this, check the documentation for that Google DataFlow - Windowing.

现在还有其他的,可能更好的方法,但这将使您的模板编译和工作。此示例显示1秒的FixedWindow,还有其他选项可以执行此操作,请查看该Google DataFlow的文档 - 窗口。

Compile your template with:

使用以下代码编译模板:

mvn compile exec:java -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToDatastore -Dexec.cleanupDaemonThreads=false -Dexec.args=" \
--project=[YOUR_PROJECTID_HERE] \
--stagingLocation=gs://[YOUR_BUCKET_HERE]/staging \
--tempLocation=gs://[YOUR_BUCKET_HERE]/temp \
--templateLocation=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--runner=DataflowRunner"

And then startup the job with:

然后用以下方式启动工作:

gcloud dataflow jobs run [NAME_OF_THE_JOB_WHATEVER_YOU_LIKE] \
--gcs-location=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--zone=[ZONE_WHERE_YOU_WANT_TO_RUN] \
--parameters "pubsubReadTopic=[YOUR_PUBSUB_TOPIC_HERE],datastoreWriteProjectId=[YOUR_PROJECTID_HERE]"

Now you should see your job running in GCP console if you look there:

现在你应该看到你的工作在GCP控制台中运行,如果你看到那里:

是否能够使用数据流将数据从pubsub流式传输到数据存储区?

Note that this specific solution and chosen window will mean a delay of up to a second for the PubSub messages to end up in the Datastore. Shortening the window may help that a little, but in order to get a higher throughput than that you would need a different pipeline than the one shown here.

请注意,此特定解决方案和所选窗口将意味着PubSub消息最终会在数据存储区中延迟一秒钟。缩短窗口可能会有所帮助,但为了获得比您需要的管道更高的吞吐量而不是此处显示的管道。