是否可以从PubSub读取消息并将其数据分离到PCollection 的不同元素中?如果是这样,怎么样?

时间:2022-11-08 15:22:44

Now, I have the below code:

现在,我有以下代码:

PCollection<String> input_data =
    pipeline
        .apply(PubsubIO
            .Read
            .withCoder(StringUtf8Coder.of())
            .named("ReadFromPubSub")
            .subscription("/subscriptions/project_name/subscription_name"));

3 个解决方案

#1


1  

Looks like you want to read some messages from pubsub and convert each of them to multiple parts by splitting a message on space characters, and then feed the parts to the rest of your pipeline. No special configuration of PubsubIO is needed, because it's not a "reading data" problem - it's a "transforming data you have already read" problem - you simply need to insert a ParDo which takes your "composite" record and breaks it down in the way you want, e.g.:

看起来您想要从pubsub读取一些消息,并通过在空格字符上拆分消息将它们转换为多个部分,然后将这些部分提供给管道的其余部分。不需要PubsubIO的特殊配置,因为它不是“读取数据”问题 - 它是“你已经读过的数据转换”问题 - 你只需要插入一个ParDo,它可以获取你的“复合”记录并将其分解为你想要的方式,例如:

PCollection<String> input_data =
pipeline
    .apply(PubsubIO
        .Read
        .withCoder(StringUtf8Coder.of())
        .named("ReadFromPubSub")
        .subscription("/subscriptions/project_name/subscription_name"))
    .apply(ParDo.of(new DoFn<String, String>() {
      public void processElement(ProcessContext c) {
        String composite = c.element();
        for (String part : composite.split(" ")) {
          c.output(part);
        }
      }}));
    }));

#2


0  

I take it you mean that the data you want is present in different elements of the PCollection and want to extract and group it somehow.

我认为你的意思是你想要的数据存在于PCollection的不同元素中,并希望以某种方式提取和分组。

A possible approach is to write a DoFn function that processes each String in the PCollection. You output a key value pair for each piece of data you want to group. You can then use the GroupByKey transform to group all the relevant data together.

一种可能的方法是编写一个DoFn函数来处理PCollection中的每个String。您为要分组的每个数据输出一个键值对。然后,您可以使用GroupByKey转换将所有相关数据组合在一起。

For example you have the following messages from pubsub in your PCollection:

例如,您在PCollection中有来自pubsub的以下消息:

  1. User 1234 bought item A
  2. 用户1234购买了商品A.
  3. User 1234 bought item B
  4. 用户1234购买了商品B.

The DoFn function will output a key value pair with the user id as key and the item bought as value. ( <1234,A> , <1234, B> ). Using the GroupByKey transform you group the two values together in one element. You can then perform further processing on that element.

DoFn函数将输出一个键值对,其中用户ID为键,项目作为值购买。 (<1234,A>,<1234,B>)。使用GroupByKey变换,您可以将两个值组合在一个元素中。然后,您可以对该元素执行进一步处理。

This is a very common pattern in bigdata called mapreduce.

这是bigdata中一种非常常见的模式,称为mapreduce。

#3


0  

You can output an Iterable<A> then use Flatten to squash it. Unsurprisingly this is termed flatMap in many next-gen data processing platforms, c.f. spark / flink.

您可以输出Iterable 然后使用Flatten来压缩它。不出所料,这在许多下一代数据处理平台中被称为flatMap,c.f。火花/叮当声。

#1


1  

Looks like you want to read some messages from pubsub and convert each of them to multiple parts by splitting a message on space characters, and then feed the parts to the rest of your pipeline. No special configuration of PubsubIO is needed, because it's not a "reading data" problem - it's a "transforming data you have already read" problem - you simply need to insert a ParDo which takes your "composite" record and breaks it down in the way you want, e.g.:

看起来您想要从pubsub读取一些消息,并通过在空格字符上拆分消息将它们转换为多个部分,然后将这些部分提供给管道的其余部分。不需要PubsubIO的特殊配置,因为它不是“读取数据”问题 - 它是“你已经读过的数据转换”问题 - 你只需要插入一个ParDo,它可以获取你的“复合”记录并将其分解为你想要的方式,例如:

PCollection<String> input_data =
pipeline
    .apply(PubsubIO
        .Read
        .withCoder(StringUtf8Coder.of())
        .named("ReadFromPubSub")
        .subscription("/subscriptions/project_name/subscription_name"))
    .apply(ParDo.of(new DoFn<String, String>() {
      public void processElement(ProcessContext c) {
        String composite = c.element();
        for (String part : composite.split(" ")) {
          c.output(part);
        }
      }}));
    }));

#2


0  

I take it you mean that the data you want is present in different elements of the PCollection and want to extract and group it somehow.

我认为你的意思是你想要的数据存在于PCollection的不同元素中,并希望以某种方式提取和分组。

A possible approach is to write a DoFn function that processes each String in the PCollection. You output a key value pair for each piece of data you want to group. You can then use the GroupByKey transform to group all the relevant data together.

一种可能的方法是编写一个DoFn函数来处理PCollection中的每个String。您为要分组的每个数据输出一个键值对。然后,您可以使用GroupByKey转换将所有相关数据组合在一起。

For example you have the following messages from pubsub in your PCollection:

例如,您在PCollection中有来自pubsub的以下消息:

  1. User 1234 bought item A
  2. 用户1234购买了商品A.
  3. User 1234 bought item B
  4. 用户1234购买了商品B.

The DoFn function will output a key value pair with the user id as key and the item bought as value. ( <1234,A> , <1234, B> ). Using the GroupByKey transform you group the two values together in one element. You can then perform further processing on that element.

DoFn函数将输出一个键值对,其中用户ID为键,项目作为值购买。 (<1234,A>,<1234,B>)。使用GroupByKey变换,您可以将两个值组合在一个元素中。然后,您可以对该元素执行进一步处理。

This is a very common pattern in bigdata called mapreduce.

这是bigdata中一种非常常见的模式,称为mapreduce。

#3


0  

You can output an Iterable<A> then use Flatten to squash it. Unsurprisingly this is termed flatMap in many next-gen data processing platforms, c.f. spark / flink.

您可以输出Iterable 然后使用Flatten来压缩它。不出所料,这在许多下一代数据处理平台中被称为flatMap,c.f。火花/叮当声。