Apache Beam - 无法使用多个输出标记推断DoFn上的编码器

时间:2020-12-04 15:38:00

I am trying to execute a pipeline using Apache Beam but I get an error when trying to put some output tags:

我正在尝试使用Apache Beam执行管道但是在尝试放置一些输出标记时出现错误:

import com.google.cloud.Tuple;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;

import java.lang.reflect.Type;
import java.util.Map;
import java.util.stream.Collectors;

 * The Transformer.
class Transformer {
    final static TupleTag<Map<String, String>> successfulTransformation = new TupleTag<>();
    final static TupleTag<Tuple<String, String>> failedTransformation = new TupleTag<>();

     * The entry point of the application.
     * @param args the input arguments
    public static void main(String... args) {
        TransformerOptions options = PipelineOptionsFactory.fromArgs(args)

        Pipeline p = Pipeline.create(options);

        p.apply("Input", PubsubIO
                        ParDo.of(new JsonTransformer())


     * Deserialize the input and convert it to a key-value pairs map.
    static class JsonTransformer extends DoFn<PubsubMessage, Map<String, String>> {

         * Process each element.
         * @param c the processing context
        public void processElement(ProcessContext c) {
            String messagePayload = new String(c.element().getPayload());
            try {
                Type type = new TypeToken<Map<String, String>>() {
                Gson gson = new Gson();
                Map<String, String> map = gson.fromJson(messagePayload, type);
            } catch (Exception e) {
                LOG.error("Failed to process input {} -- adding to dead letter file", c.element(), e);
                String attributes = c.element()
                        .entrySet().stream().map((entry) ->
                                String.format("%s -> %s\n", entry.getKey(), entry.getValue()))
                c.output(failedTransformation, Tuple.of(attributes, messagePayload));


The error shown is:


Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Transform.out1 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed.

线程“main”中的异常java.lang.IllegalStateException:无法返回Transform.out1 [PCollection]的默认编码器。更正以下根本原因之一:未手动指定编码器;你可以使用.setCoder()来完成。从CoderRegistry推断编码器失败:无法为V提供编码器。使用注册的CoderProvider构建编码器失败。查看详细故障的抑制异常。使用生成PTransform的默认输出Coder失败:无法为V提供编码器。使用已注册的CoderProvider构建编码器失败。

I tried different ways to fix the issue but I think I just do not understand what is the problem. I know that these lines cause the error to happen:



but I do not get which part of it, what part needs a specific Coder and what is "V" in the error (from "Unable to provide a Coder for V").


Why is the error happening? I also tried to look at Apache Beam's docs but they do not seems to explain such a usage nor I understand much from the section discussing about coders.

为什么会发生错误?我也试着看看Apache Beam的文档,但他们似乎没有解释这样的用法,也不是我在讨论编码器的部分中理解的。



2 个解决方案



First, I would suggest the following -- change:

首先,我建议如下 - 更改:

final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<>();
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<>();

into this:


final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<Map<String, String>>() {};
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<Tuple<String, String>>() {};

That should help the coder inference determine the type of the side output. Also, have you properly registered a CoderProvider for Tuple?

这应该有助于编码器推断确定侧输出的类型。还有,你有没有正确注册CoderProvider for Tuple?



Thanks to @Ben Chambers' answer, Kotlin is:

感谢@Ben Chambers的回答,Kotlin是:

val successTag: TupleTag<MyObj> = object : TupleTag<MyObj>() {}
val deadLetterTag: TupleTag<String> = object : TupleTag<String>() {}



First, I would suggest the following -- change:

首先,我建议如下 - 更改:

final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<>();
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<>();

into this:


final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<Map<String, String>>() {};
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<Tuple<String, String>>() {};

That should help the coder inference determine the type of the side output. Also, have you properly registered a CoderProvider for Tuple?

这应该有助于编码器推断确定侧输出的类型。还有,你有没有正确注册CoderProvider for Tuple?



Thanks to @Ben Chambers' answer, Kotlin is:

感谢@Ben Chambers的回答,Kotlin是:

val successTag: TupleTag<MyObj> = object : TupleTag<MyObj>() {}
val deadLetterTag: TupleTag<String> = object : TupleTag<String>() {}