数据流如何组合PCollections替换对象

时间:2022-01-05 15:34:09

I want to implement a process where I load 2 Kinds of data, lets say Kind A and B, PCollection<A> a1, PCollection<B> b1. Then I create a View.asMap() from a1 and give it to a DoFn dfn1 as sideinput that is applied on b1. This DoFn uses some of the values of Kind A and outputs them. Afterwards, I want to create a new PCollection<A> a2 that holds all the objects of a1, but replaces the ones that were outputted by dfn1.

我想实现一个加载2种数据的过程,比如说A类和B类,PCollection a1,PCollection b1。然后我从a1创建一个View.asMap(),并将其作为一个应用于b1的sideinput提供给DoFn dfn1。该DoFn使用A类的一些值并输出它们。之后,我想创建一个新的PCollection a2,它包含a1的所有对象,但是替换了dfn1输出的对象。

Lets say a1 holds Objects o1, b1, c1, d1, e1, f1, g1 dfn1 manipulates and outputs b1 -> b2, c1 -> c2, g1 -> g2 to PCollection<A> a2

让我们说a1持有对象o1,b1,c1,d1,e1,f1,g1 dfn1操纵并输出b1 - > b2,c1 - > c2,g1 - > g2到PCollection a2

the new PCollection combined from a1 and a2 should contain o1, b2, c2, e1, f1, g2

从a1和a2组合的新PCollection应包含o1,b2,c2,e1,f1,g2

Is there a built-in mechanism to accomplish something like that? The collections may be keyed before the "merge".

是否有内置机制来完成这样的事情?可以在“合并”之前键入集合。

Thanks in advance.

提前致谢。

As i am unsatisfied by my english explanation of the problem, here is a DoFn which performs what I was asking for. The real question is, if there is a built-in transform that can do something like this, best would be without manually creating a view before.

由于我对这个问题的英语解释不满意,这里有一个执行我要求的DoFn。真正的问题是,如果存在可以执行此类操作的内置转换,那么最好不要先手动创建视图。

public class CombineKvCollectionsWithMasterCollection extends DoFn<KV<String, Object>, Object>{
    private static final long serialVersionUID = 4100849850259729106L;

    private PCollectionView<Map<String, Object>> masterView;

    public CombineKvCollectionsWithMasterCollection(PCollectionView<Map<String, Object>> masterView) {
        this.masterView = masterView;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        KV<String, Object> kv = c.element();
        Map<String, Object> masterMap = c.sideInput(masterView);
        if (masterMap.containsKey(kv.getKey())) {
            c.output(masterMap.get(kv.getKey()));
        } else {
            c.output(kv.getValue());
        }
    }
}

1 个解决方案

#1


-1  

The Combine function does the basic functions like Sum, Min, Max and Mean. For a specific combine functionality, you would need to provide some processing logic. So, there is no in-built function that would do this for now.

Combine功能执行Sum,Min,Max和Mean等基本功能。对于特定的组合功能,您需要提供一些处理逻辑。因此,目前没有内置功能可以做到这一点。

#1


-1  

The Combine function does the basic functions like Sum, Min, Max and Mean. For a specific combine functionality, you would need to provide some processing logic. So, there is no in-built function that would do this for now.

Combine功能执行Sum,Min,Max和Mean等基本功能。对于特定的组合功能,您需要提供一些处理逻辑。因此,目前没有内置功能可以做到这一点。