使用谷歌数据流中的现有pub子订阅

时间:2020-11-29 15:39:11

Am using Google Data Flow where in one of the steps am subscribing to a topic in pub sub using already created subscription. Here is the code snippet

我正在使用Google数据流,其中一个步骤是使用已创建的订阅订阅pub sub中的主题。这是代码片段

CustomPipelineOptions options =
            PipelineOptionsFactory.fromArgs(args).withValidation().as(customPipelineOptions.class);
    Pipeline p = Pipeline.create(options);

    PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device  data from PubSub")                 .subscription("projects/<projectID>/subscriptions/<subscriptionname>)
            .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
            .timestampLabel("ts")
            .withCoder(TableRowJsonCoder.of()));

The above code when executed results in the following error: Error processing pipeline. Causes: (b5e276ef8c76419f): Unrecognized input pubsub_subscription for step s1.

执行上面的代码会导致以下错误:处理管道时出错。原因:(b5e276ef8c76419f):步骤s1无法识别的输入pubsub_subscription。

Am passing the right subscription name and project ID. Not sure why am still getting the above error.

我传递了正确的订阅名称和项目ID。不知道为什么我仍然得到上述错误。

Please kindly help.

请帮忙。

1 个解决方案

#1


2  

Specifying one of 2 sources should be enough: a topic or a subscription.

指定2个源中的一个应该足够:主题或订阅。

I suggest you try:

我建议你试试:

PCollection<TableRow> datastream = p
        .apply(PubsubIO.Read.named("Read device data from PubSub")
        .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
        .timestampLabel("ts")
        .withCoder(TableRowJsonCoder.of()));

Also: I suppose you are using the Dataflow 1.9 SDK? You might want to think about moving to the new Beam 2.0.0 release. You can find the reference for PubSub in that SDK here.

另外:我想你使用的是Dataflow 1.9 SDK?您可能想要考虑转移到新的Beam 2.0.0版本。您可以在此处找到该SDK中PubSub的参考。

#1


2  

Specifying one of 2 sources should be enough: a topic or a subscription.

指定2个源中的一个应该足够:主题或订阅。

I suggest you try:

我建议你试试:

PCollection<TableRow> datastream = p
        .apply(PubsubIO.Read.named("Read device data from PubSub")
        .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
        .timestampLabel("ts")
        .withCoder(TableRowJsonCoder.of()));

Also: I suppose you are using the Dataflow 1.9 SDK? You might want to think about moving to the new Beam 2.0.0 release. You can find the reference for PubSub in that SDK here.

另外:我想你使用的是Dataflow 1.9 SDK?您可能想要考虑转移到新的Beam 2.0.0版本。您可以在此处找到该SDK中PubSub的参考。