如何使用批处理从DataFlow中的PubSub读取

时间:2022-07-09 15:22:14

In SDK 1.9.1 in Pubsub source there were PubsubIO.Read.maxReadTime and PubsubIO.Read.maxNumRecords methods available. Those methods allowed to create bounded collection from pubsub messages, it was possible to start Dataflow pipeline in batch mode.

在Pubsub源代码的SDK 1.9.1中,有PubsubIO.Read.maxReadTime和PubsubIO.Read.maxNumRecords方法可用。这些方法允许从pubsub消息创建有界集合,可以以批处理模式启动Dataflow管道。

How simillar thing could be achieved using Dataflow SDK 2.1? How can I read from Pubsub in Dataflow pipeline using batch mode?

如何使用Dataflow SDK 2.1实现类似的功能?如何使用批处理模式从Dataflow管道中的Pubsub读取?

1 个解决方案

#1


0  

Unfortunally, I didn't see any support for that in the new versions of SDK. What I did was implement a DoFn that reads from PubSub for maxReadTime or for maxNumRecords and return the messages.

不幸的是,我在新版本的SDK中没有看到任何支持。我所做的是实现一个DoFn,它从PubSub读取maxReadTime或maxNumRecords并返回消息。

That was what they did on the previous versions of the SDK. You can check the PubsubReader class.

这就是他们在以前版本的SDK上所做的。您可以查看PubsubReader类。

You will have to call it like this:

你必须这样称呼它:

 pipeline.begin()
            .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
            .apply(ParDo. of(new MyPubsubReader(maxNumRecords, maxReadTime));
            .setCoder(coder);

#1


0  

Unfortunally, I didn't see any support for that in the new versions of SDK. What I did was implement a DoFn that reads from PubSub for maxReadTime or for maxNumRecords and return the messages.

不幸的是,我在新版本的SDK中没有看到任何支持。我所做的是实现一个DoFn,它从PubSub读取maxReadTime或maxNumRecords并返回消息。

That was what they did on the previous versions of the SDK. You can check the PubsubReader class.

这就是他们在以前版本的SDK上所做的。您可以查看PubsubReader类。

You will have to call it like this:

你必须这样称呼它:

 pipeline.begin()
            .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
            .apply(ParDo. of(new MyPubsubReader(maxNumRecords, maxReadTime));
            .setCoder(coder);