SerializableCoder无法强制转换为BeamRecordCoder

时间:2022-11-23 15:36:23

I prepared a Pcollection<BeamRecord> object from a file containing json objects using beam sql sdk.

我使用beam sql sdk从包含json对象的文件中准备了一个Pcollection 对象。

The code below parse and map json lines to ChatHistory objects, then it converts the mapped objects to BeamRecord. Finally I try to use BeamSql on the returned PCollection<BeamRecord> but I get the exception SerializableCoder cannot be cast to BeamRecordCoder.

下面的代码解析并将json行映射到ChatHistory对象,然后将映射的对象转换为BeamRecord。最后我尝试在返回的PCollection 上使用BeamSql,但我得到异常SerializableCoder无法强制转换为BeamRecordCoder。

PCollection<ChatHistory> json_objects = lines.apply(ParDo.of(new ExtractObjectsFn()));

    // Convert them to BeamRecords with the same schema as defined above via a DoFn.
      PCollection<BeamRecord> apps = json_objects.apply(
          ParDo.of(new DoFn<ChatHistory, BeamRecord>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                List<String> fields_list= new ArrayList<String>(Arrays.asList("conversation_id","message_type","message_date","message","message_auto_id"));
                List<Integer> types_list= new ArrayList<Integer>(Arrays.asList(Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR));
                BeamRecordSqlType brtype = BeamRecordSqlType.create(fields_list, types_list);
                BeamRecord br = new BeamRecord(
                              brtype, 
                              c.element().conversation_id, 
                              c.element().message_type, 
                              c.element().message_date,
                              c.element().message, 
                              c.element().message_auto_id
                              );

                c.output(br);
            }
          })); 

      return apps.apply(
          BeamSql
              .query("SELECT conversation_id, message_type, message, message_date, message_auto_id FROM PCOLLECTION")
       ); 

Here is the generated stack trace

这是生成的堆栈跟踪

java.lang.ClassCastException: org.apache.beam.sdk.coders.SerializableCoder cannot be cast to org.apache.beam.sdk.coders.BeamRecordCoder
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.registerTables (BeamSql.java:173)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand (BeamSql.java:153)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand (BeamSql.java:116)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollectionTuple.apply (PCollectionTuple.java:160)
at org.apache.beam.sdk.extensions.sql.BeamSql$SimpleQueryTransform.expand (BeamSql.java:246)
at org.apache.beam.sdk.extensions.sql.BeamSql$SimpleQueryTransform.expand (BeamSql.java:186)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:286)
at com.mdm.trial.trial3$JsonParse.expand (trial3.java:123)
at com.mdm.trial.trial3$JsonParse.expand (trial3.java:1)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:286)
at com.mdm.trial.trial3.main (trial3.java:160)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)

I saw a similar post, but It still can't fix my error: Running BeamSql WithoutCoder or Making Coder Dynamic

我看到了一个类似的帖子,但它仍然无法修复我的错误:运行BeamSql WithoutCoder或使编码器动态

Best regards !

最好的祝福 !

1 个解决方案

#1


2  

Ismail, in your case using the .setCoder() should work.

在你的情况下使用.setCoder()的Ismail应该可以工作。

I would try extracting the row type out of the ParDo, and then applying it to apps before applying the SQL query:

我会尝试从ParDo中提取行类型,然后在应用SQL查询之前将其应用于应用程序:

  PCollection<ChatHistory> json_objects = lines.apply(ParDo.of(new ExtractObjectsFn()));

  // Create a row type first:
  List<String> fields_list= new ArrayList<String>(Arrays.asList("conversation_id","message_type","message_date","message","message_auto_id"));
  List<Integer> types_list= new ArrayList<Integer>(Arrays.asList(Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR));
  final BeamRecordSqlType brtype = BeamRecordSqlType.create(fields_list, types_list);

  // Convert them to BeamRecords with the same schema as defined above via a DoFn.
  PCollection<BeamRecord> apps = json_objects.apply(
      ParDo.of(new DoFn<ChatHistory, BeamRecord>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            BeamRecord br = new BeamRecord(
                          brtype, 
                          c.element().conversation_id, 
                          c.element().message_type, 
                          c.element().message_date,
                          c.element().message, 
                          c.element().message_auto_id
                          );
            c.output(br);
        }
      })); 

  return apps
    .setCoder(brtype.getRecordCoder())
    .apply(
      BeamSql
          .query("SELECT conversation_id, message_type, message, message_date, message_auto_id FROM PCOLLECTION")
   ); 

Couple of examples:

几个例子:

  • This example sets the coder by using Create.withCoder() which does the same thing;
  • 这个例子通过使用Create.withCoder()来设置编码器,它执行相同的操作;
  • This example filters and converts from Events using the ToRow.parDo() and then also sets the coder which is specified here;
  • 此示例使用ToRow.parDo()过滤和转换事件,然后还设置此处指定的编码器;

Please note that BeamRecord has been renamed to Row, and few other changes are reflected in the examples above.

请注意,BeamRecord已重命名为Row,上面的示例中反映的其他更改很少。

#1


2  

Ismail, in your case using the .setCoder() should work.

在你的情况下使用.setCoder()的Ismail应该可以工作。

I would try extracting the row type out of the ParDo, and then applying it to apps before applying the SQL query:

我会尝试从ParDo中提取行类型,然后在应用SQL查询之前将其应用于应用程序:

  PCollection<ChatHistory> json_objects = lines.apply(ParDo.of(new ExtractObjectsFn()));

  // Create a row type first:
  List<String> fields_list= new ArrayList<String>(Arrays.asList("conversation_id","message_type","message_date","message","message_auto_id"));
  List<Integer> types_list= new ArrayList<Integer>(Arrays.asList(Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR));
  final BeamRecordSqlType brtype = BeamRecordSqlType.create(fields_list, types_list);

  // Convert them to BeamRecords with the same schema as defined above via a DoFn.
  PCollection<BeamRecord> apps = json_objects.apply(
      ParDo.of(new DoFn<ChatHistory, BeamRecord>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            BeamRecord br = new BeamRecord(
                          brtype, 
                          c.element().conversation_id, 
                          c.element().message_type, 
                          c.element().message_date,
                          c.element().message, 
                          c.element().message_auto_id
                          );
            c.output(br);
        }
      })); 

  return apps
    .setCoder(brtype.getRecordCoder())
    .apply(
      BeamSql
          .query("SELECT conversation_id, message_type, message, message_date, message_auto_id FROM PCOLLECTION")
   ); 

Couple of examples:

几个例子:

  • This example sets the coder by using Create.withCoder() which does the same thing;
  • 这个例子通过使用Create.withCoder()来设置编码器,它执行相同的操作;
  • This example filters and converts from Events using the ToRow.parDo() and then also sets the coder which is specified here;
  • 此示例使用ToRow.parDo()过滤和转换事件,然后还设置此处指定的编码器;

Please note that BeamRecord has been renamed to Row, and few other changes are reflected in the examples above.

请注意,BeamRecord已重命名为Row,上面的示例中反映的其他更改很少。