I am using AvroIO.writeCustomTypeToGenericRecords to write messages base on type of event message. The DestinationT is custom bean class, that implemenets serializable, I am getting following error while running the code:
我正在使用AvroIO.writeCustomTypeToGenericRecords根据事件消息的类型编写消息。 DestinationT是自定义bean类,实现可序列化,我在运行代码时遇到以下错误:
java.lang.RuntimeException: org.apache.beam.sdk.coders.Coder$NonDeterministicException: org.apache.beam.sdk.coders.SerializableCoder@5d436f5b is not deterministic because: Java Serialization may be non-deterministic.
java.lang.RuntimeException:org.apache.beam.sdk.coders.Coder $ NonDeterministicException:org.apache.beam.sdk.coders.SerializableCoder@5d436f5b不确定,因为:Java序列化可能是不确定的。
Looks like I have to create coder for this custom bean class.
看起来我必须为这个自定义bean类创建编码器。
1 个解决方案
#1
0
I could able to solve this problem by creating Bean class for example
我可以通过创建Bean类来解决这个问题
public class TestBean{
private String field1;
public TestBean(){
}
public TestBean(String field1){
this.field1=field1
}
//getter and setter methods for each property
}
Then added coder for the same by
然后添加编码器
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* this is coder for TestBean
*/
public class TestBeanCoder
extends AtomicCoder<TestBean> {
//singleton
private static final TestBeanCoder INSTANCE = new TestBeanCoder();
private static final TypeDescriptor<TestBean> TYPE_DESCRIPTOR =
new TypeDescriptor<TestBean>() {
};
private final ObjectMapper MAPPER = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
/**
* singleton, it is require to be of()
*
* @return
*/
public static TestBeanCoder of() {
return INSTANCE;
}
/**
* private construction
*/
private void TestBeanCoder() {
}
/**
* encode TestBean
*
* @param value
* @param outStream
* @throws IOException
* @throws CoderException
*/
public void encode(TestBean value, OutputStream outStream)
throws IOException, CoderException {
if (value == null) {
throw new CoderException("cannot encode a null ByteString");
}
String strValue = MAPPER.writeValueAsString(value);
StringUtf8Coder.of().encode(strValue, outStream);
}
/**
* decode input stream
*
* @param inStream
* @return
* @throws IOException
*/
public TestBean decode(InputStream inStream) throws IOException {
String strValue = StringUtf8Coder.of().decode(inStream);
return MAPPER.readValue(strValue, TestBean.class);
}
@Override
public void verifyDeterministic() {
}
@Override
public boolean consistentWithEquals() {
return true;
}
@Override
public boolean isRegisterByteSizeObserverCheap(TestBean value) {
return true;
}
@Override
public TypeDescriptor<TestBean> getEncodedTypeDescriptor() {
return TYPE_DESCRIPTOR;
}
}
register same coder with pipeline
用管道注册同一个编码器
CoderProviders.fromStaticMethods(classOf[TestBean], classOf[TestBeanCoder])
pipeline.getCoderRegistry.registerCoderProvider(coder)
Similarly can be done via scala case classes as well.
同样可以通过scala案例类来完成。
#1
0
I could able to solve this problem by creating Bean class for example
我可以通过创建Bean类来解决这个问题
public class TestBean{
private String field1;
public TestBean(){
}
public TestBean(String field1){
this.field1=field1
}
//getter and setter methods for each property
}
Then added coder for the same by
然后添加编码器
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* this is coder for TestBean
*/
public class TestBeanCoder
extends AtomicCoder<TestBean> {
//singleton
private static final TestBeanCoder INSTANCE = new TestBeanCoder();
private static final TypeDescriptor<TestBean> TYPE_DESCRIPTOR =
new TypeDescriptor<TestBean>() {
};
private final ObjectMapper MAPPER = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
/**
* singleton, it is require to be of()
*
* @return
*/
public static TestBeanCoder of() {
return INSTANCE;
}
/**
* private construction
*/
private void TestBeanCoder() {
}
/**
* encode TestBean
*
* @param value
* @param outStream
* @throws IOException
* @throws CoderException
*/
public void encode(TestBean value, OutputStream outStream)
throws IOException, CoderException {
if (value == null) {
throw new CoderException("cannot encode a null ByteString");
}
String strValue = MAPPER.writeValueAsString(value);
StringUtf8Coder.of().encode(strValue, outStream);
}
/**
* decode input stream
*
* @param inStream
* @return
* @throws IOException
*/
public TestBean decode(InputStream inStream) throws IOException {
String strValue = StringUtf8Coder.of().decode(inStream);
return MAPPER.readValue(strValue, TestBean.class);
}
@Override
public void verifyDeterministic() {
}
@Override
public boolean consistentWithEquals() {
return true;
}
@Override
public boolean isRegisterByteSizeObserverCheap(TestBean value) {
return true;
}
@Override
public TypeDescriptor<TestBean> getEncodedTypeDescriptor() {
return TYPE_DESCRIPTOR;
}
}
register same coder with pipeline
用管道注册同一个编码器
CoderProviders.fromStaticMethods(classOf[TestBean], classOf[TestBeanCoder])
pipeline.getCoderRegistry.registerCoderProvider(coder)
Similarly can be done via scala case classes as well.
同样可以通过scala案例类来完成。