在Java Dataflow 1.9.0中使用DoFn多次

时间:2021-07-19 15:35:28

I want to make one part of my Pipeline dependent on dynamic input to the Pipeline creation process. My question is, what is the recommendend way to do so?

我想让我的Pipeline的一部分依赖于Pipeline创建过程的动态输入。我的问题是,推荐的方式是什么?

If i have following (reduced) code:

如果我有以下(减少)代码:

public static void createPipeline(){
  Pipeline p = TestPipeline.create();
  p.apply(new Source()).apply(new DoFunction());
  p.apply(new AnotherSource()).apply(new DoFunction());
  p.run;
}

now the DoFunction should be a parameter. Should I instantiate it once and pass it to the function createPipeline, or should I use a Class Parameter and instantiate it? Version with instantiated Function:

现在DoFunction应该是一个参数。我应该将它实例化一次并将其传递给函数createPipeline,还是应该使用类参数并实例化它?具有实例化功能的版本:

public static void createPipeline(DoFn dofn){
  Pipeline p = TestPipeline.create();
  p.apply(new Source()).apply(dofn);
  p.apply(new AnotherSource()).apply(dofn);
  p.run;
}

Version with class-parameter:

带有类参数的版本:

public static void createPipeline(Class<?> fnClass){
  Pipeline p = TestPipeline.create();
  p.apply(new Source()).apply(fnClass.newInstance());
  p.apply(new AnotherSource()).apply(fnClass.newInstance());
  p.run;
}

1 个解决方案

#1


0  

There is no need for passing a Class - you can just pass the DoFn.

没有必要传递一个类 - 你可以通过DoFn。

public static void createPipeline(DoFn<Foo, Baz> dofn) {
  Pipeline pipeline = TestPipeline.create();

  pipeline
      .apply(Read.from(new Source()))
      .apply(ParDo.of(dofn));

  pipeline
      .apply(Read.from(new AnotherSource()))
      .apply(ParDo.of(dofn));

  pipeline.run();
}

You can even pass the fully instantiated ParDo.of(doFn) transform and apply it multiple times.

您甚至可以传递完全实例化的ParDo.of(doFn)变换并多次应用它。

If you are actually not going to use the PCollection returned from the ParDo then you can also just flatten the inputs together.

如果您实际上不打算使用从ParDo返回的PCollection,那么您也可以将输入拼接在一起。

#1


0  

There is no need for passing a Class - you can just pass the DoFn.

没有必要传递一个类 - 你可以通过DoFn。

public static void createPipeline(DoFn<Foo, Baz> dofn) {
  Pipeline pipeline = TestPipeline.create();

  pipeline
      .apply(Read.from(new Source()))
      .apply(ParDo.of(dofn));

  pipeline
      .apply(Read.from(new AnotherSource()))
      .apply(ParDo.of(dofn));

  pipeline.run();
}

You can even pass the fully instantiated ParDo.of(doFn) transform and apply it multiple times.

您甚至可以传递完全实例化的ParDo.of(doFn)变换并多次应用它。

If you are actually not going to use the PCollection returned from the ParDo then you can also just flatten the inputs together.

如果您实际上不打算使用从ParDo返回的PCollection,那么您也可以将输入拼接在一起。