在光束中读和写串行化的原型。

时间:2021-10-08 15:22:03

I suppose it should be faily easy to write PCollection of serialized protobuf messages into Text files and read them back. But I failed to do so after a few attempts. Would appreciate it if anyone has any comment.

我认为应该很容易地将串行化的原buf消息的PCollection写入文本文件并读取它们。但经过几次尝试,我没能做到。如果有人对此有任何评论,我将不胜感激。

// definition of proto.

syntax = "proto3";
package test;
message PhoneNumber {
  string number = 1;
  string country = 2;
}

I have the python code below that implements a simple Beam pipeline to write texts into serialized protobufs.

我有下面的python代码,它实现了一个简单的束管道,可以将文本写入序列化的原型。

# Test python code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2

class ToProtoFn(beam.DoFn):
  def process(self, element):
    phone = phone_pb2.PhoneNumber()
    phone.number, phone.country = element.strip().split(',')
    yield phone.SerializeToString()

with beam.Pipeline(options=PipelineOptions()) as p:
  lines = (p 
      | beam.Create(["123-456-789,us", "345-567-789,ca"])
      | beam.ParDo(ToProtoFn())
      | beam.io.WriteToText('/Users/greeness/data/phone-pb'))

The pipeline can be run successfully and produced a file with content:

管道可以成功运行,并生成一个包含内容的文件:

$ cat ~/data/phone-pb-00000-of-00001 


123-456-789us


345-567-789ca

Then I code another pipeline to read the serialized protobufs and parse them with a ParDo.

然后,我编写另一个管道来读取序列化的原buf,并使用ParDo来解析它们。

class ToCsvFn(beam.DoFn):
  def process(self, element):
    phone = phone_pb2.PhoneNumber()
    phone.ParseFromString(element)
    yield ",".join([phone.number, phone.country])

with beam.Pipeline(options=PipelineOptions()) as p:
  lines = (p 
      | beam.io.ReadFromText('/Users/greeness/data/phone*')
      | beam.ParDo(ToCsvFn())
      | beam.io.WriteToText('/Users/greeness/data/phone-csv'))

I got this error message when running it.

运行它时,我得到了这个错误消息。

  File "/Library/Python/2.7/site-packages/apache_beam/runners/common.py", line 458, in process_outputs
  for result in results:
  File "phone_example.py", line 37, in process
phone.ParseFromString(element)
  File "/Library/Python/2.7/site-packages/google/protobuf/message.py", line 185, in ParseFromString
  self.MergeFromString(serialized)
  File "/Library/Python/2.7/site-packages/google/protobuf/internal/python_message.py", line 1069, in MergeFromString
  raise message_mod.DecodeError('Truncated message.')
  DecodeError: Truncated message. [while running 'ParDo(ToCsvFn)']

So it looks like the serialized protobuf string cannot be parsed. Am I missing something? Thanks for any help!

因此,看起来序列化的原buf字符串不能被解析。我遗漏了什么东西?感谢任何帮助!

1 个解决方案

#1


0  

I find a temporary solution via the implemented tfrecordio.py.

我通过实现的tfrecordio.py找到了一个临时解决方案。

The code below is working. But I am still open to any comment that could solve the above problem.

下面的代码正在工作。但我仍然愿意接受任何可以解决上述问题的意见。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2

def WriteTextToTFRecord():
  class ToProtoFn(beam.DoFn):
    def process(self, element):
      phone = phone_pb2.PhoneNumber()
      phone.number, phone.country = element.strip().split(',')
      yield phone
  with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | beam.Create(["123-456-789,us", "345-567-789,ca"])
    processed = (
        lines
        | beam.ParDo(ToProtoFn())
        | beam.io.WriteToTFRecord('/Users/greeness/data/phone-pb',
                                  coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__)))

def ReadTFRecordAndSaveAsCSV():
  class ToCsvFn(beam.DoFn):
    def process(self, element):
      yield ','.join([element.number, element.country])
  with beam.Pipeline(options=PipelineOptions()) as p:
    lines = (p
      | beam.io.ReadFromTFRecord('/Users/greeness/data/phone-pb-*',
                                 coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__))
      | beam.ParDo(ToCsvFn())
      | beam.io.WriteToText('/Users/greeness/data/phone-csv'))

if __name__ == '__main__':
  WriteTextToTFRecord()
  ReadTFRecordAndSaveAsCSV()

#1


0  

I find a temporary solution via the implemented tfrecordio.py.

我通过实现的tfrecordio.py找到了一个临时解决方案。

The code below is working. But I am still open to any comment that could solve the above problem.

下面的代码正在工作。但我仍然愿意接受任何可以解决上述问题的意见。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2

def WriteTextToTFRecord():
  class ToProtoFn(beam.DoFn):
    def process(self, element):
      phone = phone_pb2.PhoneNumber()
      phone.number, phone.country = element.strip().split(',')
      yield phone
  with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | beam.Create(["123-456-789,us", "345-567-789,ca"])
    processed = (
        lines
        | beam.ParDo(ToProtoFn())
        | beam.io.WriteToTFRecord('/Users/greeness/data/phone-pb',
                                  coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__)))

def ReadTFRecordAndSaveAsCSV():
  class ToCsvFn(beam.DoFn):
    def process(self, element):
      yield ','.join([element.number, element.country])
  with beam.Pipeline(options=PipelineOptions()) as p:
    lines = (p
      | beam.io.ReadFromTFRecord('/Users/greeness/data/phone-pb-*',
                                 coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__))
      | beam.ParDo(ToCsvFn())
      | beam.io.WriteToText('/Users/greeness/data/phone-csv'))

if __name__ == '__main__':
  WriteTextToTFRecord()
  ReadTFRecordAndSaveAsCSV()