apache beam动态目标自定义目标数据类型

时间:2021-09-17 15:35:07

I am using AvroIO.writeCustomTypeToGenericRecords to write messages base on type of event message. The DestinationT is custom bean class, that implemenets serializable, I am getting following error while running the code:

我正在使用AvroIO.writeCustomTypeToGenericRecords根据事件消息的类型编写消息。 DestinationT是自定义bean类,实现可序列化,我在运行代码时遇到以下错误:

java.lang.RuntimeException: org.apache.beam.sdk.coders.Coder$NonDeterministicException: org.apache.beam.sdk.coders.SerializableCoder@5d436f5b is not deterministic because: Java Serialization may be non-deterministic.

java.lang.RuntimeException:org.apache.beam.sdk.coders.Coder $ NonDeterministicException:org.apache.beam.sdk.coders.SerializableCoder@5d436f5b不确定,因为:Java序列化可能是不确定的。

Looks like I have to create coder for this custom bean class.


1 个解决方案



I could able to solve this problem by creating Bean class for example


public class TestBean{

  private String field1;

  public TestBean(){


  public TestBean(String field1){

  //getter and setter methods for each property


Then added coder for the same by


import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

 * this is coder for TestBean
public class TestBeanCoder
        extends AtomicCoder<TestBean> {

  private static final TestBeanCoder INSTANCE = new TestBeanCoder();

  private static final TypeDescriptor<TestBean> TYPE_DESCRIPTOR =
          new TypeDescriptor<TestBean>() {

  private final ObjectMapper MAPPER = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

   * singleton, it is require to be of()
   * @return
  public static TestBeanCoder of() {
    return INSTANCE;

   * private construction
  private void TestBeanCoder() {

   * encode TestBean
   * @param value
   * @param outStream
   * @throws IOException
   * @throws CoderException
  public void encode(TestBean value, OutputStream outStream)
          throws IOException, CoderException {
    if (value == null) {
      throw new CoderException("cannot encode a null ByteString");
    String strValue = MAPPER.writeValueAsString(value);
    StringUtf8Coder.of().encode(strValue, outStream);

   * decode input stream
   * @param inStream
   * @return
   * @throws IOException
  public TestBean decode(InputStream inStream) throws IOException {
    String strValue = StringUtf8Coder.of().decode(inStream);
    return MAPPER.readValue(strValue, TestBean.class);

  public void verifyDeterministic() {


  public boolean consistentWithEquals() {
    return true;

  public boolean isRegisterByteSizeObserverCheap(TestBean value) {
    return true;

  public TypeDescriptor<TestBean> getEncodedTypeDescriptor() {

register same coder with pipeline


CoderProviders.fromStaticMethods(classOf[TestBean], classOf[TestBeanCoder])

Similarly can be done via scala case classes as well.




I could able to solve this problem by creating Bean class for example


public class TestBean{

  private String field1;

  public TestBean(){


  public TestBean(String field1){

  //getter and setter methods for each property


Then added coder for the same by


import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

 * this is coder for TestBean
public class TestBeanCoder
        extends AtomicCoder<TestBean> {

  private static final TestBeanCoder INSTANCE = new TestBeanCoder();

  private static final TypeDescriptor<TestBean> TYPE_DESCRIPTOR =
          new TypeDescriptor<TestBean>() {

  private final ObjectMapper MAPPER = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

   * singleton, it is require to be of()
   * @return
  public static TestBeanCoder of() {
    return INSTANCE;

   * private construction
  private void TestBeanCoder() {

   * encode TestBean
   * @param value
   * @param outStream
   * @throws IOException
   * @throws CoderException
  public void encode(TestBean value, OutputStream outStream)
          throws IOException, CoderException {
    if (value == null) {
      throw new CoderException("cannot encode a null ByteString");
    String strValue = MAPPER.writeValueAsString(value);
    StringUtf8Coder.of().encode(strValue, outStream);

   * decode input stream
   * @param inStream
   * @return
   * @throws IOException
  public TestBean decode(InputStream inStream) throws IOException {
    String strValue = StringUtf8Coder.of().decode(inStream);
    return MAPPER.readValue(strValue, TestBean.class);

  public void verifyDeterministic() {


  public boolean consistentWithEquals() {
    return true;

  public boolean isRegisterByteSizeObserverCheap(TestBean value) {
    return true;

  public TypeDescriptor<TestBean> getEncodedTypeDescriptor() {

register same coder with pipeline


CoderProviders.fromStaticMethods(classOf[TestBean], classOf[TestBeanCoder])

Similarly can be done via scala case classes as well.
