如何在Apache Beam中使用ParDo和DoFn创建读变换

时间:2022-06-08 15:38:14

According to the Apache Beam documentation the recommended way to write simple sources is by using Read Transforms and ParDo. Unfortunately the Apache Beam docs has let me down here.

根据Apache Beam文档,编写简单源的推荐方法是使用Read Transforms和ParDo。不幸的是,Apache Beam文档让我失望了。

I'm trying to write a simple unbounded data source which emits events using a ParDo but the compiler keeps complaining about the input type of the DoFn object:

我正在尝试编写一个简单的*数据源,它使用ParDo发出事件,但编译器一直在抱怨DoFn对象的输入类型:

message: 'The method apply(PTransform<? super PBegin,OutputT>) in the type PBegin is not applicable for the arguments (ParDo.SingleOutput<PBegin,Event>)'

My attempt:

我的尝试:

public class TestIO extends PTransform<PBegin, PCollection<Event>> {

    @Override
    public PCollection<Event> expand(PBegin input) {
        return input.apply(ParDo.of(new ReadFn()));
    }

    private static class ReadFn extends DoFn<PBegin, Event> {
        @ProcessElement
        public void process(@TimerId("poll") Timer pollTimer) {
            Event testEvent = new Event(...);

            //custom logic, this can happen infinitely
            for(...) {
                context.output(testEvent);
            }
        }
    }
}

1 个解决方案

#1


0  

A DoFn performs element-wise processing. As written, ParDo.of(new ReadFn()) will have type PTransform<PCollection<PBegin>, PCollection<Event>>. Specifically, the ReadFn indicates it takes an element of type PBegin and returns 0 or more elements of type Event.

DoFn执行逐元素处理。如上所述,ParDo.of(new ReadFn())将具有类型PTransform ,PCollection >。具体来说,ReadFn表示它采用PBegin类型的元素并返回0或更多类型为Event的元素。

Instead, you should use an actual Read operation. There are a variety provided. You can also use Create if you have a specific set of in-memory collections to use.

相反,您应该使用实际的Read操作。提供了各种各样的。如果要使用一组特定的内存中集合,也可以使用Create。

If you need to create a custom source you should use the Read transform. Since you're using timers, you likely want to create an Unbounded Source (a stream of elements).

如果需要创建自定义源,则应使用“读取”转换。由于您使用的是计时器,因此您可能希望创建一个*源(元素流)。

#1


0  

A DoFn performs element-wise processing. As written, ParDo.of(new ReadFn()) will have type PTransform<PCollection<PBegin>, PCollection<Event>>. Specifically, the ReadFn indicates it takes an element of type PBegin and returns 0 or more elements of type Event.

DoFn执行逐元素处理。如上所述,ParDo.of(new ReadFn())将具有类型PTransform ,PCollection >。具体来说,ReadFn表示它采用PBegin类型的元素并返回0或更多类型为Event的元素。

Instead, you should use an actual Read operation. There are a variety provided. You can also use Create if you have a specific set of in-memory collections to use.

相反,您应该使用实际的Read操作。提供了各种各样的。如果要使用一组特定的内存中集合,也可以使用Create。

If you need to create a custom source you should use the Read transform. Since you're using timers, you likely want to create an Unbounded Source (a stream of elements).

如果需要创建自定义源,则应使用“读取”转换。由于您使用的是计时器,因此您可能希望创建一个*源(元素流)。