Google Cloud Dataflow:无法使用TextIO.Read解析proto

时间:2021-08-14 15:24:04

Here's my code

这是我的代码

PCollection<MyProto> pCollection = p.apply(TextIO.Read.from(
            "gs://my_bucket/*")
            .withCoder(Proto2Coder.of(MyProto.class)));

but this fails with the error

但这失败了,错误

Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).

The file when downloaded locally parses fine.

本地下载的文件解析得很好。

I've also tried to do the same thing by using a StringUtf8Coder and ByteArrayCoder, but no dice.

我也尝试使用StringUtf8Coder和ByteArrayCoder做同样的事情,但没有骰子。

any help? Should I not be using TextIO? What other options do I have?

任何帮助?我不应该使用TextIO吗?我还有其他选择吗?

1 个解决方案

#1


3  

TextIO splits the file into lines and applies the coder to each line. Naturally, that doesn't work well with formats that are not line-based. I suppose that your files contain a single serialized proto each, correct? In that case you have 2 options:

TextIO将文件拆分为行并将编码器应用于每一行。当然,这不适用于非基于行的格式。我想你的文件每个包含一个序列化的proto,对吗?在这种情况下,您有两个选择:

  • Create your own Source and Reader classes (see generic documentation on creating sources and sinks) by subclassing FileBasedFormat.
  • 通过继承FileBasedFormat,创建自己的Source和Reader类(请参阅有关创建源和接收器的通用文档)。
  • Treat the act of processing all your files as a ParDo - create an in-memory PCollection containing the filenames to process (using Create.of()) and pipe it through a ParDo that takes a filename and parses the file as a protobuf; then pipe to the rest of your pipeline.
  • 将处理所有文件的行为视为ParDo - 创建一个内存中的PCollection,其中包含要处理的文件名(使用Create.of())并通过ParDo传递它,该ParDo采用文件名并将文件解析为protobuf;然后管道到你的管道的其余部分。

The second is easier but the first will work better if you have really a lot of files.

第二个更容易,但如果你真的有很多文件,第一个会更好。

#1


3  

TextIO splits the file into lines and applies the coder to each line. Naturally, that doesn't work well with formats that are not line-based. I suppose that your files contain a single serialized proto each, correct? In that case you have 2 options:

TextIO将文件拆分为行并将编码器应用于每一行。当然,这不适用于非基于行的格式。我想你的文件每个包含一个序列化的proto,对吗?在这种情况下,您有两个选择:

  • Create your own Source and Reader classes (see generic documentation on creating sources and sinks) by subclassing FileBasedFormat.
  • 通过继承FileBasedFormat,创建自己的Source和Reader类(请参阅有关创建源和接收器的通用文档)。
  • Treat the act of processing all your files as a ParDo - create an in-memory PCollection containing the filenames to process (using Create.of()) and pipe it through a ParDo that takes a filename and parses the file as a protobuf; then pipe to the rest of your pipeline.
  • 将处理所有文件的行为视为ParDo - 创建一个内存中的PCollection,其中包含要处理的文件名(使用Create.of())并通过ParDo传递它,该ParDo采用文件名并将文件解析为protobuf;然后管道到你的管道的其余部分。

The second is easier but the first will work better if you have really a lot of files.

第二个更容易,但如果你真的有很多文件,第一个会更好。