在Google Cloud Dataflow中发布嵌套TableRow时的无限递归

时间:2021-12-22 14:29:13

I'm trying to pass a TableRow I've generated between stages of my pipeline, and I get the following error:

我正在尝试传递我在管道阶段之间生成的TableRow,并且我收到以下错误:

Exception in thread "main" 
com.google.cloud.dataflow.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalArgumentException: Forbidden IOException when writing to OutputStream 
[... exception propagation ...] 
Caused by: com.fasterxml.jackson.databind.JsonMappingException: 
Infinite recursion (*Error) (through reference chain: 
com.google.protobuf.Descriptors$Descriptor["file"]
->com.google.protobuf.Descriptors$FileDescriptor["messageTypes"]
->java.util.Collections$UnmodifiableRandomAccessList[0]-> 
[... many, many lines of this ...]
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:733)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContentsUsing(IndexedListSerializer.java:142)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:88)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:717)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:717)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContentsUsing(IndexedListSerializer.java:142)
[... many, many lines of this ...]
Caused by: java.lang.*Error
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:736)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:717)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContentsUsing(IndexedListSerializer.java:142)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:88)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
[... snip ...]
at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:717)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)

I'm constructing my TableRow recursively from a Google protobuf via its Descriptor - I traverse the descriptor depth-first recursively (since protobufs may have nested definitions) and build the TableRow as I traverse. Below is an excerpt from the TableRow creation class:

我正在通过其描述符从Google protobuf递归构建我的TableRow - 我以递归方式遍历描述符深度(因为protobufs可能有嵌套定义)并在遍历时构建TableRow。以下是TableRow创建类的摘录:

public void processElement(ProcessContext c) throws Exception {
    TableRow row = getTableRow(c.element());
    LOG.info(row.toPrettyString());
    c.output(row);
}

private TableRow getTableRow(TMessage message) throws Exception {
    TableRow row = new TableRow();
    encode(message, row);
    return row;
}

private TableCell getTableCell(TMessage message) throws Exception {
    TableCell cell = new TableCell();
    encode(message, cell);
    return cell;
}

private void encode(TMessage message, GenericJson row) throws Exception {
    Descriptors.Descriptor descriptor = message.getDescriptorForType();
    List<Descriptors.FieldDescriptor> fields = descriptor.getFields();

    for (Descriptors.FieldDescriptor fieldDescriptor : fields) {
        Descriptors.FieldDescriptor.Type fieldType = fieldDescriptor.getType();

        switch (fieldType) {
            case DOUBLE:
            case FLOAT:
            case INT64:
            case UINT64:
            case INT32:
            case FIXED64:
            case FIXED32:
            case UINT32:
            case SFIXED32:
            case SFIXED64:
            case SINT32:
            case SINT64:
            case BOOL:
            case STRING:
            case BYTES:
            case ENUM:
                if (fieldDescriptor.isRepeated()) {
                    List<Object> tableCells = new ArrayList<>();

                    tableCells.addAll((List<?>) message.getField(fieldDescriptor));

                    row.set(fieldDescriptor.getName(), tableCells);
                }
                else {
                    row.set(fieldDescriptor.getName(), message.getField(fieldDescriptor));
                }

                break;
            case MESSAGE:
                if (fieldDescriptor.isRepeated()) {
                    List<TableRow> tableRows = new ArrayList<>();
                    for (Object o : (List<?>) message.getField(fieldDescriptor)) {
                        TMessage nestedMessage = (TMessage) o;
                        TableRow tableRow = getTableRow(nestedMessage);
                        tableRows.add(tableRow);
                    }
                    row.set(fieldDescriptor.getName(), tableRows);
                }
                else {
                    row.set(fieldDescriptor.getName(), getTableCell((TMessage) message.getField(fieldDescriptor)));
                }
                break;
            case GROUP:
                throw new Exception("groups are deprecated");
        }

    }

I believe that the TableRow is being created correctly because I've both tested this DoFn with some simple dummy data and looked at the result of the TableRow creation on a subset of my dataset (see the snippet above, where I LOG.info the result of the TableRow encoding), and the resulting TableRow seems to contain all of the data I expect with no extra fields.

我相信TableRow正在被正确创建,因为我已经用一些简单的虚拟数据测试了这个DoFn,并查看了我的数据集子集上的TableRow创建结果(参见上面的代码片段,其中我LOG.info结果TableRow编码),结果TableRow似乎包含我期望的所有数据,没有额外的字段。

1 个解决方案

#1


2  

Based on the stack trace and the code, it looks like something in the Protocol Buffer message may be self-referential. The JSON encoding is failing while following these references.

基于堆栈跟踪和代码,看起来协议缓冲区消息中的某些内容可能是自引用的。遵循这些引用时,JSON编码失败。

Looking at the code, my guess would be that you encountering an enum. If you look at the protocol buffer documentation of getField it says it returns an EnumValueDescriptor.

看看代码,我的猜测是你遇到了一个枚举。如果查看getField的协议缓冲区文档,它会返回一个EnumValueDescriptor。

Looking at the EnumValueDescriptor, it has a link to the FileDescriptor, which has a link to EnumDescriptor which has a link to the FileDescriptor, which has a list of all the EnumDescriptors, which has a link to the FileDescriptor, etc.

查看EnumValueDescriptor,它有一个指向FileDescriptor的链接,FileDescriptor有一个指向EnumDescriptor的链接,该链接有一个指向FileDescriptor的链接,该链接包含所有EnumDescriptors的列表,其中包含指向FileDescriptor的链接等。

If you handle the ENUM case specially (specifically to prevent protos from appearing as values in the JSON Map) it should fix your problem.

如果您专门处理ENUM案例(特别是为了防止protos在JSON Map中显示为值),它应该解决您的问题。

#1


2  

Based on the stack trace and the code, it looks like something in the Protocol Buffer message may be self-referential. The JSON encoding is failing while following these references.

基于堆栈跟踪和代码,看起来协议缓冲区消息中的某些内容可能是自引用的。遵循这些引用时,JSON编码失败。

Looking at the code, my guess would be that you encountering an enum. If you look at the protocol buffer documentation of getField it says it returns an EnumValueDescriptor.

看看代码,我的猜测是你遇到了一个枚举。如果查看getField的协议缓冲区文档,它会返回一个EnumValueDescriptor。

Looking at the EnumValueDescriptor, it has a link to the FileDescriptor, which has a link to EnumDescriptor which has a link to the FileDescriptor, which has a list of all the EnumDescriptors, which has a link to the FileDescriptor, etc.

查看EnumValueDescriptor,它有一个指向FileDescriptor的链接,FileDescriptor有一个指向EnumDescriptor的链接,该链接有一个指向FileDescriptor的链接,该链接包含所有EnumDescriptors的列表,其中包含指向FileDescriptor的链接等。

If you handle the ENUM case specially (specifically to prevent protos from appearing as values in the JSON Map) it should fix your problem.

如果您专门处理ENUM案例(特别是为了防止protos在JSON Map中显示为值),它应该解决您的问题。