如何在PCollection >中使用自定义编码器?

时间:2021-11-28 15:31:10

I'm trying to use a custom Coder so that I can do some transforms, but I'm having trouble getting the PCollection to use my custom coder, and I suspect (???) it's because it's wrapped in a KV. Specifically:

我正在尝试使用自定义编码器,以便我可以进行一些变换,但是我无法让PCollection使用我的自定义编码器,我怀疑(???)它是因为它包裹在一个KV中。特别:

Pipeline p = Pipeline.create ...
p.getCoderRegistry().registerCoder(MyClass.class, MyClassCoder.class);

...

PCollection<String> input = ...
PCollection<KV<String, MyClass>> t = input.apply(new ToKVTransform());

When I try to run something like this, I get a java.lang.ClassCastException and a stacktrace that includes a SerializableCoder instead of MyClassCoder like I would expect.

当我尝试运行这样的东西时,我得到一个java.lang.ClassCastException和一个包含SerializableCoder而不是MyClassCoder的堆栈跟踪,就像我期望的那样。

[error]     at com.google.cloud.dataflow.sdk.coders.SerializableCoder.decode(SerializableCoder.java:133)
[error]     at com.google.cloud.dataflow.sdk.coders.SerializableCoder.decode(SerializableCoder.java:50)
[error]     at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:95)
[error]     at com.google.cloud.dataflow.sdk.coders.KvCoder.decode(KvCoder.java:42)

I see that the answer to another, somewhat related question (Using TextIO.Write with a complicated PCollection type in Google Cloud Dataflow) says to map everything to strings, and use that to pass stuff around PCollections. Is that really the recommended way??

我看到另一个有点相关的问题的答案(在Google Cloud Dataflow中使用带有复杂PCollection类型的TextIO.Write)说要将所有内容映射到字符串,并使用它来传递PCollections周围的东西。这真的是推荐的方式吗?

(Note: the actual code is in Scala, but I'm pretty sure it's not a Scala <=> Java issue so I've translated it into Java here.)

(注意:实际代码是在Scala中,但我很确定它不是Scala <=> Java问题所以我在这里将它翻译成Java。)

Update to include Scala code and more background:

更新以包含Scala代码和更多背景:

So this is the actual exception itself (should have included this at the beginning):

所以这是实际的异常本身(应该在开头包括这个):

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field com.example.schema.Schema.keyTypes of type scala.collection.immutable.Map in instance of com.example.schema.Schema

Where com.example.schema.Schema is:

com.example.schema.Schema在哪里:

case class Schema(id: String, keyTypes: Map[String, Type])

And lastly, the SchemaCoder is:

最后,SchemaCoder是:

class SchemaCoder extends com.google.cloud.dataflow.sdk.coders.CustomCoder[Schema] {
  def decode(inputStream: InputStream, context: Context): Schema = {
    val ois = new ObjectInputStream(inputStream)
    val id: String = ois.readObject().asInstanceOf[String]
    val javaMap: java.util.Map[String, Type] = ois.readObject().asInstanceOf[java.util.Map[String, Type]]
    ois.close()

    Schema(id, javaMap.asScala.toMap)
  }

  def encode(schema: Schema, outputStream: OutputStream, context: Context): Unit = {
    val baos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(baos)
    oos.writeObject(schema.id)
    val javaMap: java.util.Map[String, Type] = schema.keyTypes.asJava
    oos.writeObject(javaMap)
    oos.close()

    val encoded = new String(Base64.encodeBase64(baos.toByteArray()))
    outputStream.write(encoded.getBytes())
  }
}

====

====

Edit2: And here's what ToKVTransform actually looks like:

Edit2:以下是ToKVTransform的实际情况:

class SchemaExtractorTransform extends PTransform[PCollection[String], PCollection[Schema]] {
  class InferSchemaFromStringWithKeyFn extends DoFn[String, KV[String, Schema]] {
    override def processElement(c: DoFn[String, KV[String, Schema]]#ProcessContext): Unit = {
      val line = c.element()
      inferSchemaFromString(line)
    }
  }

  class GetFirstFn extends DoFn[KV[String, java.lang.Iterable[Schema]], Schema] {
    override def processElement(c: DoFn[KV[String, java.lang.Iterable[Schema]], Schema]#ProcessContext): Unit = {
      val idAndSchemas: KV[String, java.lang.Iterable[Schema]] = c.element()
      val it: java.util.Iterator[Schema] = idAndSchemas.getValue().iterator()
      c.output(it.next())
    }
  }

  override def apply(inputLines: PCollection[String]): PCollection[Schema] = {
    val schemasWithKey: PCollection[KV[String, Schema]] = inputLines.apply(
      ParDo.named("InferSchemas").of(new InferSchemaFromStringWithKeyFn())
    )

    val keyed: PCollection[KV[String, java.lang.Iterable[Schema]]] = schemasWithKey.apply(
      GroupByKey.create()
    )

    val schemasOnly: PCollection[Schema] = keyed.apply(
      ParDo.named("GetFirst").of(new GetFirstFn())
    )

    schemasOnly
  }
}

1 个解决方案

#1


2  

This problem doesn't reproduce in Java; Scala is doing something differently with types that breaks Dataflow coder inference. To work around this, you can call setCoder on a PCollection to set its Coder explicitly, such as

这个问题不能在Java中重现; Scala对打破Dataflow编码器推断的类型做了不同的处理。要解决此问题,可以在PCollection上调用setCoder来显式设置其Coder,例如

schemasWithKey.setCoder(KvCoder.of(StringUtf8Coder.of(), SchemaCoder.of());

Here's the Java version of your code, just to make sure that it's doing approximately the same thing:

这是你的代码的Java版本,只是为了确保它做的大致相同:

public static class SchemaExtractorTransform
  extends PTransform<PCollection<String>, PCollection<Schema>> {
  class InferSchemaFromStringWithKeyFn extends DoFn<String, KV<String, Schema>> {
    public void processElement(ProcessContext c) {
      c.output(KV.of(c.element(), new Schema()));
    }
  }

  class GetFirstFn extends DoFn<KV<String, java.lang.Iterable<Schema>>, Schema> {
    private static final long serialVersionUID = 0;
    public void processElement(ProcessContext c) {
      c.output(c.element().getValue().iterator().next());
    }
  }

  public PCollection<Schema> apply(PCollection<String> inputLines) {
    PCollection<KV<String, Schema>> schemasWithKey = inputLines.apply(
        ParDo.named("InferSchemas").of(new InferSchemaFromStringWithKeyFn()));

    PCollection<KV<String, java.lang.Iterable<Schema>>> keyed =
        schemasWithKey.apply(GroupByKey.<String, Schema>create());

    PCollection<Schema> schemasOnly =
        keyed.apply(ParDo.named("GetFirst").of(new GetFirstFn()));

    return schemasOnly;
  }
}

#1


2  

This problem doesn't reproduce in Java; Scala is doing something differently with types that breaks Dataflow coder inference. To work around this, you can call setCoder on a PCollection to set its Coder explicitly, such as

这个问题不能在Java中重现; Scala对打破Dataflow编码器推断的类型做了不同的处理。要解决此问题,可以在PCollection上调用setCoder来显式设置其Coder,例如

schemasWithKey.setCoder(KvCoder.of(StringUtf8Coder.of(), SchemaCoder.of());

Here's the Java version of your code, just to make sure that it's doing approximately the same thing:

这是你的代码的Java版本,只是为了确保它做的大致相同:

public static class SchemaExtractorTransform
  extends PTransform<PCollection<String>, PCollection<Schema>> {
  class InferSchemaFromStringWithKeyFn extends DoFn<String, KV<String, Schema>> {
    public void processElement(ProcessContext c) {
      c.output(KV.of(c.element(), new Schema()));
    }
  }

  class GetFirstFn extends DoFn<KV<String, java.lang.Iterable<Schema>>, Schema> {
    private static final long serialVersionUID = 0;
    public void processElement(ProcessContext c) {
      c.output(c.element().getValue().iterator().next());
    }
  }

  public PCollection<Schema> apply(PCollection<String> inputLines) {
    PCollection<KV<String, Schema>> schemasWithKey = inputLines.apply(
        ParDo.named("InferSchemas").of(new InferSchemaFromStringWithKeyFn()));

    PCollection<KV<String, java.lang.Iterable<Schema>>> keyed =
        schemasWithKey.apply(GroupByKey.<String, Schema>create());

    PCollection<Schema> schemasOnly =
        keyed.apply(ParDo.named("GetFirst").of(new GetFirstFn()));

    return schemasOnly;
  }
}