是否可以从PubSub读取消息并将其数据分离到PCollection 的不同元素中?如果是这样,怎么样?

时间:2022-11-08 15:22:44

Now, I have the below code:


PCollection<String> input_data =

3 个解决方案



Looks like you want to read some messages from pubsub and convert each of them to multiple parts by splitting a message on space characters, and then feed the parts to the rest of your pipeline. No special configuration of PubsubIO is needed, because it's not a "reading data" problem - it's a "transforming data you have already read" problem - you simply need to insert a ParDo which takes your "composite" record and breaks it down in the way you want, e.g.:

看起来您想要从pubsub读取一些消息,并通过在空格字符上拆分消息将它们转换为多个部分,然后将这些部分提供给管道的其余部分。不需要PubsubIO的特殊配置,因为它不是“读取数据”问题 - 它是“你已经读过的数据转换”问题 - 你只需要插入一个ParDo,它可以获取你的“复合”记录并将其分解为你想要的方式,例如:

PCollection<String> input_data =
    .apply(ParDo.of(new DoFn<String, String>() {
      public void processElement(ProcessContext c) {
        String composite = c.element();
        for (String part : composite.split(" ")) {



I take it you mean that the data you want is present in different elements of the PCollection and want to extract and group it somehow.


A possible approach is to write a DoFn function that processes each String in the PCollection. You output a key value pair for each piece of data you want to group. You can then use the GroupByKey transform to group all the relevant data together.


For example you have the following messages from pubsub in your PCollection:


  1. User 1234 bought item A
  2. 用户1234购买了商品A.
  3. User 1234 bought item B
  4. 用户1234购买了商品B.

The DoFn function will output a key value pair with the user id as key and the item bought as value. ( <1234,A> , <1234, B> ). Using the GroupByKey transform you group the two values together in one element. You can then perform further processing on that element.

DoFn函数将输出一个键值对,其中用户ID为键,项目作为值购买。 (<1234,A>,<1234,B>)。使用GroupByKey变换,您可以将两个值组合在一个元素中。然后,您可以对该元素执行进一步处理。

This is a very common pattern in bigdata called mapreduce.




You can output an Iterable<A> then use Flatten to squash it. Unsurprisingly this is termed flatMap in many next-gen data processing platforms, c.f. spark / flink.

您可以输出Iterable 然后使用Flatten来压缩它。不出所料,这在许多下一代数据处理平台中被称为flatMap,c.f。火花/叮当声。



Looks like you want to read some messages from pubsub and convert each of them to multiple parts by splitting a message on space characters, and then feed the parts to the rest of your pipeline. No special configuration of PubsubIO is needed, because it's not a "reading data" problem - it's a "transforming data you have already read" problem - you simply need to insert a ParDo which takes your "composite" record and breaks it down in the way you want, e.g.:

看起来您想要从pubsub读取一些消息,并通过在空格字符上拆分消息将它们转换为多个部分,然后将这些部分提供给管道的其余部分。不需要PubsubIO的特殊配置,因为它不是“读取数据”问题 - 它是“你已经读过的数据转换”问题 - 你只需要插入一个ParDo,它可以获取你的“复合”记录并将其分解为你想要的方式,例如:

PCollection<String> input_data =
    .apply(ParDo.of(new DoFn<String, String>() {
      public void processElement(ProcessContext c) {
        String composite = c.element();
        for (String part : composite.split(" ")) {



I take it you mean that the data you want is present in different elements of the PCollection and want to extract and group it somehow.


A possible approach is to write a DoFn function that processes each String in the PCollection. You output a key value pair for each piece of data you want to group. You can then use the GroupByKey transform to group all the relevant data together.


For example you have the following messages from pubsub in your PCollection:


  1. User 1234 bought item A
  2. 用户1234购买了商品A.
  3. User 1234 bought item B
  4. 用户1234购买了商品B.

The DoFn function will output a key value pair with the user id as key and the item bought as value. ( <1234,A> , <1234, B> ). Using the GroupByKey transform you group the two values together in one element. You can then perform further processing on that element.

DoFn函数将输出一个键值对,其中用户ID为键,项目作为值购买。 (<1234,A>,<1234,B>)。使用GroupByKey变换,您可以将两个值组合在一个元素中。然后,您可以对该元素执行进一步处理。

This is a very common pattern in bigdata called mapreduce.




You can output an Iterable<A> then use Flatten to squash it. Unsurprisingly this is termed flatMap in many next-gen data processing platforms, c.f. spark / flink.

您可以输出Iterable 然后使用Flatten来压缩它。不出所料,这在许多下一代数据处理平台中被称为flatMap,c.f。火花/叮当声。