如何在Dataflow / Beam中的gcs中将结果写入JSON文件

时间:2022-05-04 15:36:13

I'm using the Python Beam SDK 0.6.0. And I would like to write my output to JSON in Google Cloud Storage. What is the best way to do this?

我正在使用Python Beam SDK 0.6.0。我想在Google云端存储中将输出写入JSON。做这个的最好方式是什么?

I quess I can use WriteToText from the Text IO sink but then I have to format each row separately, right? How do I save my result into valid JSON files that contain lists of objects?

我问我可以使用Text IO接收器中的WriteToText但是我必须分别格式化每一行,对吗?如何将结果保存到包含对象列表的有效JSON文件中?

3 个解决方案

#1


2  

Ok, for reference, I solved this by writing my own sink building on the _TextSink used by WriteToText in the beam SDK.

好的,作为参考,我通过在梁SDK中的WriteToText使用的_TextSink上编写我自己的接收器来解决这个问题。

Not sure if this is the best way to do it but it works well so far. Hope it might help someone else.

不确定这是否是最好的方法,但它到目前为止运作良好。希望它可以帮助别人。

import os
import json

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform   

class _JsonSink(beam.io.FileSink):
    """A Dataflow sink for writing JSON files."""

    def __init__(self,
                 file_path_prefix,
                 file_name_suffix='',
                 num_shards=0,
                 shard_name_template=None,
                 coder=coders.ToStringCoder(),
                 compression_type=beam.io.CompressionTypes.AUTO):

        super(_JsonSink, self).__init__(
            file_path_prefix,
            file_name_suffix=file_name_suffix,
            num_shards=num_shards,
            shard_name_template=shard_name_template,
            coder=coder,
            mime_type='text/plain',
            compression_type=compression_type)
        self.last_rows = dict()

    def open(self, temp_path):
        """ Open file and initialize it w opening a list."""
        file_handle = super(_JsonSink, self).open(temp_path)
        file_handle.write('[\n')
        return file_handle

    def write_record(self, file_handle, value):
        """Writes a single encoded record converted to JSON and terminates the
        line w a comma."""
        if self.last_rows.get(file_handle, None) is not None:
            file_handle.write(self.coder.encode(
                json.dumps(self.last_rows[file_handle])))
            file_handle.write(',\n')

        self.last_rows[file_handle] = value

    def close(self, file_handle):
        """Finalize the JSON list and close the file handle returned from
        ``open()``. Called after all records are written.
        """
        if file_handle is not None:
            # Write last row without a comma
            file_handle.write(self.coder.encode(
                json.dumps(self.last_rows[file_handle])))

            # Close list and then the file
            file_handle.write('\n]\n')
            file_handle.close()


class WriteToJson(PTransform):
    """PTransform for writing to JSON files."""

    def __init__(self,
                 file_path_prefix,
                 file_name_suffix='',
                 num_shards=0,
                 shard_name_template=None,
                 coder=coders.ToStringCoder(),
                 compression_type=beam.io.CompressionTypes.AUTO):

        self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards,
                               shard_name_template, coder, compression_type)

    def expand(self, pcoll):
        return pcoll | Write(self._sink)

Using the sink is similar to how you use the the text sink:

使用接收器与使用文本接收器的方式类似:

pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json')

#2


1  

Making each file contain a single list with a bunch of elements is difficult, because you'd need to group a bunch of elements and then write them together to a file. Let me advice you to use a different format.

使每个文件包含一个包含一堆元素的列表很困难,因为您需要将一堆元素分组,然后将它们一起写入文件。我建议你使用不同的格式。

You may consider the JSON Lines format, where each line in a file represents a single JSON element.

您可以考虑JSON Lines格式,其中文件中的每一行代表一个JSON元素。

Transforming your data to JSON Lines should be pretty easy. The following transform should do the trick:

将数据转换为JSON行应该非常简单。以下转换应该可以解决问题:

class WriteToJsonLines(beam.PTransform):
    def __init__(self, file_name):
        self._file_name = file_name

    def expand(self, pcoll):
        return (pcoll
                | 'format json' >> beam.Map(json.dumps)
                | 'write to text' >> beam.WriteToText(self._file_name))

Finally, if you later on want to read your JSON Lines files, you can write your own JsonLinesSource or use the one in beam_utils.

最后,如果您以后想要读取JSON Lines文件,可以编写自己的JsonLinesSource或使用beam_utils中的JsonLinesSource。

#3


0  

Although this is a year late, I'd like to add another way to write a result to json files in GCS. For apache beam 2.x pipelines, this transform works:

虽然这已经晚了一年,但我想添加另一种方法将结果写入GCS中的json文件。对于apache beam 2.x管道,此转换有效:

.withSuffix(".json")

.withSuffix( “JSON”)

For example:

例如:

result.apply("WriteToGCS", TextIO.write().to(bucket)
            .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)
            .withSuffix(".json")
            .withNumShards(chunks));

#1


2  

Ok, for reference, I solved this by writing my own sink building on the _TextSink used by WriteToText in the beam SDK.

好的,作为参考,我通过在梁SDK中的WriteToText使用的_TextSink上编写我自己的接收器来解决这个问题。

Not sure if this is the best way to do it but it works well so far. Hope it might help someone else.

不确定这是否是最好的方法,但它到目前为止运作良好。希望它可以帮助别人。

import os
import json

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform   

class _JsonSink(beam.io.FileSink):
    """A Dataflow sink for writing JSON files."""

    def __init__(self,
                 file_path_prefix,
                 file_name_suffix='',
                 num_shards=0,
                 shard_name_template=None,
                 coder=coders.ToStringCoder(),
                 compression_type=beam.io.CompressionTypes.AUTO):

        super(_JsonSink, self).__init__(
            file_path_prefix,
            file_name_suffix=file_name_suffix,
            num_shards=num_shards,
            shard_name_template=shard_name_template,
            coder=coder,
            mime_type='text/plain',
            compression_type=compression_type)
        self.last_rows = dict()

    def open(self, temp_path):
        """ Open file and initialize it w opening a list."""
        file_handle = super(_JsonSink, self).open(temp_path)
        file_handle.write('[\n')
        return file_handle

    def write_record(self, file_handle, value):
        """Writes a single encoded record converted to JSON and terminates the
        line w a comma."""
        if self.last_rows.get(file_handle, None) is not None:
            file_handle.write(self.coder.encode(
                json.dumps(self.last_rows[file_handle])))
            file_handle.write(',\n')

        self.last_rows[file_handle] = value

    def close(self, file_handle):
        """Finalize the JSON list and close the file handle returned from
        ``open()``. Called after all records are written.
        """
        if file_handle is not None:
            # Write last row without a comma
            file_handle.write(self.coder.encode(
                json.dumps(self.last_rows[file_handle])))

            # Close list and then the file
            file_handle.write('\n]\n')
            file_handle.close()


class WriteToJson(PTransform):
    """PTransform for writing to JSON files."""

    def __init__(self,
                 file_path_prefix,
                 file_name_suffix='',
                 num_shards=0,
                 shard_name_template=None,
                 coder=coders.ToStringCoder(),
                 compression_type=beam.io.CompressionTypes.AUTO):

        self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards,
                               shard_name_template, coder, compression_type)

    def expand(self, pcoll):
        return pcoll | Write(self._sink)

Using the sink is similar to how you use the the text sink:

使用接收器与使用文本接收器的方式类似:

pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json')

#2


1  

Making each file contain a single list with a bunch of elements is difficult, because you'd need to group a bunch of elements and then write them together to a file. Let me advice you to use a different format.

使每个文件包含一个包含一堆元素的列表很困难,因为您需要将一堆元素分组,然后将它们一起写入文件。我建议你使用不同的格式。

You may consider the JSON Lines format, where each line in a file represents a single JSON element.

您可以考虑JSON Lines格式,其中文件中的每一行代表一个JSON元素。

Transforming your data to JSON Lines should be pretty easy. The following transform should do the trick:

将数据转换为JSON行应该非常简单。以下转换应该可以解决问题:

class WriteToJsonLines(beam.PTransform):
    def __init__(self, file_name):
        self._file_name = file_name

    def expand(self, pcoll):
        return (pcoll
                | 'format json' >> beam.Map(json.dumps)
                | 'write to text' >> beam.WriteToText(self._file_name))

Finally, if you later on want to read your JSON Lines files, you can write your own JsonLinesSource or use the one in beam_utils.

最后,如果您以后想要读取JSON Lines文件,可以编写自己的JsonLinesSource或使用beam_utils中的JsonLinesSource。

#3


0  

Although this is a year late, I'd like to add another way to write a result to json files in GCS. For apache beam 2.x pipelines, this transform works:

虽然这已经晚了一年,但我想添加另一种方法将结果写入GCS中的json文件。对于apache beam 2.x管道,此转换有效:

.withSuffix(".json")

.withSuffix( “JSON”)

For example:

例如:

result.apply("WriteToGCS", TextIO.write().to(bucket)
            .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)
            .withSuffix(".json")
            .withNumShards(chunks));