I prepared a Pcollection<BeamRecord>
object from a file containing json objects using beam sql sdk.
我使用beam sql sdk从包含json对象的文件中准备了一个Pcollection
The code below parse and map json lines to ChatHistory
objects, then it converts the mapped objects to BeamRecord
. Finally I try to use BeamSql
on the returned PCollection<BeamRecord>
but I get the exception SerializableCoder cannot be cast to BeamRecordCoder.
下面的代码解析并将json行映射到ChatHistory对象,然后将映射的对象转换为BeamRecord。最后我尝试在返回的PCollection
PCollection<ChatHistory> json_objects = lines.apply(ParDo.of(new ExtractObjectsFn()));
// Convert them to BeamRecords with the same schema as defined above via a DoFn.
PCollection<BeamRecord> apps = json_objects.apply(
ParDo.of(new DoFn<ChatHistory, BeamRecord>() {
@ProcessElement
public void processElement(ProcessContext c) {
List<String> fields_list= new ArrayList<String>(Arrays.asList("conversation_id","message_type","message_date","message","message_auto_id"));
List<Integer> types_list= new ArrayList<Integer>(Arrays.asList(Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR));
BeamRecordSqlType brtype = BeamRecordSqlType.create(fields_list, types_list);
BeamRecord br = new BeamRecord(
brtype,
c.element().conversation_id,
c.element().message_type,
c.element().message_date,
c.element().message,
c.element().message_auto_id
);
c.output(br);
}
}));
return apps.apply(
BeamSql
.query("SELECT conversation_id, message_type, message, message_date, message_auto_id FROM PCOLLECTION")
);
Here is the generated stack trace
这是生成的堆栈跟踪
java.lang.ClassCastException: org.apache.beam.sdk.coders.SerializableCoder cannot be cast to org.apache.beam.sdk.coders.BeamRecordCoder
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.registerTables (BeamSql.java:173)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand (BeamSql.java:153)
at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand (BeamSql.java:116)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollectionTuple.apply (PCollectionTuple.java:160)
at org.apache.beam.sdk.extensions.sql.BeamSql$SimpleQueryTransform.expand (BeamSql.java:246)
at org.apache.beam.sdk.extensions.sql.BeamSql$SimpleQueryTransform.expand (BeamSql.java:186)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:286)
at com.mdm.trial.trial3$JsonParse.expand (trial3.java:123)
at com.mdm.trial.trial3$JsonParse.expand (trial3.java:1)
at org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform (Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply (PCollection.java:286)
at com.mdm.trial.trial3.main (trial3.java:160)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
I saw a similar post, but It still can't fix my error: Running BeamSql WithoutCoder or Making Coder Dynamic
我看到了一个类似的帖子,但它仍然无法修复我的错误:运行BeamSql WithoutCoder或使编码器动态
Best regards !
最好的祝福 !
1 个解决方案
#1
2
Ismail, in your case using the .setCoder()
should work.
在你的情况下使用.setCoder()的Ismail应该可以工作。
I would try extracting the row type out of the ParDo
, and then applying it to apps
before applying the SQL query:
我会尝试从ParDo中提取行类型,然后在应用SQL查询之前将其应用于应用程序:
PCollection<ChatHistory> json_objects = lines.apply(ParDo.of(new ExtractObjectsFn()));
// Create a row type first:
List<String> fields_list= new ArrayList<String>(Arrays.asList("conversation_id","message_type","message_date","message","message_auto_id"));
List<Integer> types_list= new ArrayList<Integer>(Arrays.asList(Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR));
final BeamRecordSqlType brtype = BeamRecordSqlType.create(fields_list, types_list);
// Convert them to BeamRecords with the same schema as defined above via a DoFn.
PCollection<BeamRecord> apps = json_objects.apply(
ParDo.of(new DoFn<ChatHistory, BeamRecord>() {
@ProcessElement
public void processElement(ProcessContext c) {
BeamRecord br = new BeamRecord(
brtype,
c.element().conversation_id,
c.element().message_type,
c.element().message_date,
c.element().message,
c.element().message_auto_id
);
c.output(br);
}
}));
return apps
.setCoder(brtype.getRecordCoder())
.apply(
BeamSql
.query("SELECT conversation_id, message_type, message, message_date, message_auto_id FROM PCOLLECTION")
);
Couple of examples:
几个例子:
-
This example sets the coder by using
Create.withCoder()
which does the same thing; - 这个例子通过使用Create.withCoder()来设置编码器,它执行相同的操作;
-
This example filters and converts from
Events
using the ToRow.parDo() and then also sets the coder which is specified here; - 此示例使用ToRow.parDo()过滤和转换事件,然后还设置此处指定的编码器;
Please note that BeamRecord
has been renamed to Row
, and few other changes are reflected in the examples above.
请注意,BeamRecord已重命名为Row,上面的示例中反映的其他更改很少。
#1
2
Ismail, in your case using the .setCoder()
should work.
在你的情况下使用.setCoder()的Ismail应该可以工作。
I would try extracting the row type out of the ParDo
, and then applying it to apps
before applying the SQL query:
我会尝试从ParDo中提取行类型,然后在应用SQL查询之前将其应用于应用程序:
PCollection<ChatHistory> json_objects = lines.apply(ParDo.of(new ExtractObjectsFn()));
// Create a row type first:
List<String> fields_list= new ArrayList<String>(Arrays.asList("conversation_id","message_type","message_date","message","message_auto_id"));
List<Integer> types_list= new ArrayList<Integer>(Arrays.asList(Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR));
final BeamRecordSqlType brtype = BeamRecordSqlType.create(fields_list, types_list);
// Convert them to BeamRecords with the same schema as defined above via a DoFn.
PCollection<BeamRecord> apps = json_objects.apply(
ParDo.of(new DoFn<ChatHistory, BeamRecord>() {
@ProcessElement
public void processElement(ProcessContext c) {
BeamRecord br = new BeamRecord(
brtype,
c.element().conversation_id,
c.element().message_type,
c.element().message_date,
c.element().message,
c.element().message_auto_id
);
c.output(br);
}
}));
return apps
.setCoder(brtype.getRecordCoder())
.apply(
BeamSql
.query("SELECT conversation_id, message_type, message, message_date, message_auto_id FROM PCOLLECTION")
);
Couple of examples:
几个例子:
-
This example sets the coder by using
Create.withCoder()
which does the same thing; - 这个例子通过使用Create.withCoder()来设置编码器,它执行相同的操作;
-
This example filters and converts from
Events
using the ToRow.parDo() and then also sets the coder which is specified here; - 此示例使用ToRow.parDo()过滤和转换事件,然后还设置此处指定的编码器;
Please note that BeamRecord
has been renamed to Row
, and few other changes are reflected in the examples above.
请注意,BeamRecord已重命名为Row,上面的示例中反映的其他更改很少。