Here's my code
这是我的代码
PCollection<MyProto> pCollection = p.apply(TextIO.Read.from(
"gs://my_bucket/*")
.withCoder(Proto2Coder.of(MyProto.class)));
but this fails with the error
但这失败了,错误
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
The file when downloaded locally parses fine.
本地下载的文件解析得很好。
I've also tried to do the same thing by using a StringUtf8Coder and ByteArrayCoder, but no dice.
我也尝试使用StringUtf8Coder和ByteArrayCoder做同样的事情,但没有骰子。
any help? Should I not be using TextIO? What other options do I have?
任何帮助?我不应该使用TextIO吗?我还有其他选择吗?
1 个解决方案
#1
3
TextIO splits the file into lines and applies the coder to each line. Naturally, that doesn't work well with formats that are not line-based. I suppose that your files contain a single serialized proto each, correct? In that case you have 2 options:
TextIO将文件拆分为行并将编码器应用于每一行。当然,这不适用于非基于行的格式。我想你的文件每个包含一个序列化的proto,对吗?在这种情况下,您有两个选择:
- Create your own Source and Reader classes (see generic documentation on creating sources and sinks) by subclassing FileBasedFormat.
- 通过继承FileBasedFormat,创建自己的Source和Reader类(请参阅有关创建源和接收器的通用文档)。
- Treat the act of processing all your files as a ParDo - create an in-memory PCollection containing the filenames to process (using
Create.of()
) and pipe it through a ParDo that takes a filename and parses the file as a protobuf; then pipe to the rest of your pipeline. - 将处理所有文件的行为视为ParDo - 创建一个内存中的PCollection,其中包含要处理的文件名(使用Create.of())并通过ParDo传递它,该ParDo采用文件名并将文件解析为protobuf;然后管道到你的管道的其余部分。
The second is easier but the first will work better if you have really a lot of files.
第二个更容易,但如果你真的有很多文件,第一个会更好。
#1
3
TextIO splits the file into lines and applies the coder to each line. Naturally, that doesn't work well with formats that are not line-based. I suppose that your files contain a single serialized proto each, correct? In that case you have 2 options:
TextIO将文件拆分为行并将编码器应用于每一行。当然,这不适用于非基于行的格式。我想你的文件每个包含一个序列化的proto,对吗?在这种情况下,您有两个选择:
- Create your own Source and Reader classes (see generic documentation on creating sources and sinks) by subclassing FileBasedFormat.
- 通过继承FileBasedFormat,创建自己的Source和Reader类(请参阅有关创建源和接收器的通用文档)。
- Treat the act of processing all your files as a ParDo - create an in-memory PCollection containing the filenames to process (using
Create.of()
) and pipe it through a ParDo that takes a filename and parses the file as a protobuf; then pipe to the rest of your pipeline. - 将处理所有文件的行为视为ParDo - 创建一个内存中的PCollection,其中包含要处理的文件名(使用Create.of())并通过ParDo传递它,该ParDo采用文件名并将文件解析为protobuf;然后管道到你的管道的其余部分。
The second is easier but the first will work better if you have really a lot of files.
第二个更容易,但如果你真的有很多文件,第一个会更好。