Google Cloud Dataflow从字典中写入CSV

时间:2022-02-01 15:24:08

I have a dictionary of values that I would like to write to GCS as a valid .CSV file using the Python SDK. I can write the dictionary out as newline separated text file, but I can't seem to find an example converting the dictionary to a valid .CSV. Can anybody suggest the best way to generate csv's within a dataflow pipeline? This answers to this question address Reading from CSV files, but don't really address writing to CSV files. I recognize that CSV files are just text files with rules, but I'm still struggling to convert the dictionary of data to a CSV that can be written using WriteToText.

我有一个值字典,我想使用Python SDK将其写入GCS作为有效的.CSV文件。我可以将字典写成换行符分隔的文本文件,但我似乎找不到将字典转换为有效.CSV的示例。任何人都可以建议在数据流管道中生成csv的最佳方法吗?这回答了这个问题地址从CSV文件中读取,但实际上并没有解决写入CSV文件的问题。我认识到CSV文件只是带有规则的文本文件,但我仍然在努力将数据字典转换为可以使用WriteToText编写的CSV。

Here is a simple example dictionary that I would like to turn into a CSV:

这是一个简单的示例字典,我想将其转换为CSV:

test_input = [{'label': 1, 'text': 'Here is a sentence'},
              {'label': 2, 'text': 'Another sentence goes here'}]


test_input  | beam.io.WriteToText(path_to_gcs)

The above would result in a text file that had each dictionary on a newline. Is there any functionality within Apache Beam that I can take advantage of (similar to csv.DictWriter)?

以上将导致文本文件在换行符上包含每个字典。我可以利用Apache Beam中的任何功能(类似于csv.DictWriter)吗?

2 个解决方案

#1


2  

Generally you will want to write a function that can convert your original dict data elements into a csv-formatted string representation.

通常,您需要编写一个可以将原始dict数据元素转换为csv格式的字符串表示形式的函数。

That function can be written as a DoFn that you can apply to your Beam PCollection of data, which would convert each collection element into the desired format; you can do this by applying the DoFn to your PCollection via ParDo. You can also wrap this DoFn in a more user-friendly PTransform.

该函数可以写为DoFn,您可以将其应用于您的Beam PCollection数据,这将把每个集合元素转换为所需的格式;你可以通过ParDo将DoFn应用到你的PCollection来做到这一点。您还可以将此DoFn包装在更加用户友好的PTransform中。

You can learn more about this process in the Beam Programming Guide

您可以在Beam Programming Guide中了解有关此过程的更多信息

Here is a simple, translatable non-Beam example:

这是一个简单的,可翻译的非梁示例:

# Our example list of dictionary elements
test_input = [{'label': 1, 'text': 'Here is a sentence'},
             {'label': 2, 'text': 'Another sentence goes here'}]

def convert_my_dict_to_csv_record(input_dict):
    """ Turns dictionary values into a comma-separated value formatted string """
    return ','.join(map(str, input_dict.values()))

# Our converted list of elements
converted_test_input = [convert_my_dict_to_csv_record(element) for element in test_input]

The converted_test_input will look like the following:

converted_test_input将如下所示:

['Here is a sentence,1', 'Another sentence goes here,2']

Beam DictToCSV DoFn and PTransform example using DictWriter

Beam DictToCSV使用DictWriter的DoFn和PTransform示例

from csv import DictWriter
from csv import excel
from cStringIO import StringIO

...

def _dict_to_csv(element, column_order, missing_val='', discard_extras=True, dialect=excel):
    """ Additional properties for delimiters, escape chars, etc via an instance of csv.Dialect
        Note: This implementation does not support unicode
    """

    buf = StringIO()

    writer = DictWriter(buf,
                        fieldnames=column_order,
                        restval=missing_val,
                        extrasaction=('ignore' if discard_extras else 'raise'),
                        dialect=dialect)
    writer.writerow(element)

    return buf.getvalue().rstrip(dialect.lineterminator)


class _DictToCSVFn(DoFn):
    """ Converts a Dictionary to a CSV-formatted String

        column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
        missing_val: The value to be written when a named field from `column_order` is not found in the input element
        discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
        dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect

    """

    def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
        self._column_order = column_order
        self._missing_val = missing_val
        self._discard_extras = discard_extras
        self._dialect = dialect

    def process(self, element, *args, **kwargs):
        result = _dict_to_csv(element,
                              column_order=self._column_order,
                              missing_val=self._missing_val,
                              discard_extras=self._discard_extras,
                              dialect=self._dialect)

        return [result,]

class DictToCSV(PTransform):
    """ Transforms a PCollection of Dictionaries to a PCollection of CSV-formatted Strings

        column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
        missing_val: The value to be written when a named field from `column_order` is not found in an input element
        discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
        dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect

    """

    def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
        self._column_order = column_order
        self._missing_val = missing_val
        self._discard_extras = discard_extras
        self._dialect = dialect

    def expand(self, pcoll):
        return pcoll | ParDo(_DictToCSVFn(column_order=self._column_order,
                                          missing_val=self._missing_val,
                                          discard_extras=self._discard_extras,
                                          dialect=self._dialect)
                             )

To use the example, you would put your test_input into a PCollection, and apply the DictToCSV PTransform to the PCollection; you can take the resulting converted PCollection and use it as input for WriteToText. Note that you must provide a list or tuple of column names, via the column_order argument, corresponding to keys for your dictionary input elements; the resulting CSV-formatted string columns will be in the order of the column names provided. Also, the underlying implementation for the example does not support unicode.

要使用该示例,您可以将test_input放入PCollection中,并将DictToCSV PTransform应用于PCollection;您可以获取生成的转换PCollection并将其用作WriteToText的输入。请注意,您必须通过column_order参数提供列名或元组列,对应于字典输入元素的键;生成的CSV格式的字符串列将按提供的列名称的顺序排列。此外,该示例的底层实现不支持unicode。

#2


0  

Based on Andrew's suggestion, here is a ConvertDictToCSV function that I created:

基于Andrew的建议,这是我创建的ConvertDictToCSV函数:

def ConvertDictToCSV(input_dict, fieldnames, separator=",", quotechar='"'):
  value_list = []
  for field in fieldnames:
    if input_dict[field]:
      field_value = str(input_dict[field])
    else:
      field_value = ""
    if separator in field_value:
      field_value = quotechar + field_value + quotechar
    value_list.append(field_value)

  return separator.join(value_list)

This appears to be working well, but would certainly be safer to make use of csv.DictWriter if possible

这似乎运行良好,但如果可能的话,使用csv.DictWriter肯定会更安全

#1


2  

Generally you will want to write a function that can convert your original dict data elements into a csv-formatted string representation.

通常,您需要编写一个可以将原始dict数据元素转换为csv格式的字符串表示形式的函数。

That function can be written as a DoFn that you can apply to your Beam PCollection of data, which would convert each collection element into the desired format; you can do this by applying the DoFn to your PCollection via ParDo. You can also wrap this DoFn in a more user-friendly PTransform.

该函数可以写为DoFn,您可以将其应用于您的Beam PCollection数据,这将把每个集合元素转换为所需的格式;你可以通过ParDo将DoFn应用到你的PCollection来做到这一点。您还可以将此DoFn包装在更加用户友好的PTransform中。

You can learn more about this process in the Beam Programming Guide

您可以在Beam Programming Guide中了解有关此过程的更多信息

Here is a simple, translatable non-Beam example:

这是一个简单的,可翻译的非梁示例:

# Our example list of dictionary elements
test_input = [{'label': 1, 'text': 'Here is a sentence'},
             {'label': 2, 'text': 'Another sentence goes here'}]

def convert_my_dict_to_csv_record(input_dict):
    """ Turns dictionary values into a comma-separated value formatted string """
    return ','.join(map(str, input_dict.values()))

# Our converted list of elements
converted_test_input = [convert_my_dict_to_csv_record(element) for element in test_input]

The converted_test_input will look like the following:

converted_test_input将如下所示:

['Here is a sentence,1', 'Another sentence goes here,2']

Beam DictToCSV DoFn and PTransform example using DictWriter

Beam DictToCSV使用DictWriter的DoFn和PTransform示例

from csv import DictWriter
from csv import excel
from cStringIO import StringIO

...

def _dict_to_csv(element, column_order, missing_val='', discard_extras=True, dialect=excel):
    """ Additional properties for delimiters, escape chars, etc via an instance of csv.Dialect
        Note: This implementation does not support unicode
    """

    buf = StringIO()

    writer = DictWriter(buf,
                        fieldnames=column_order,
                        restval=missing_val,
                        extrasaction=('ignore' if discard_extras else 'raise'),
                        dialect=dialect)
    writer.writerow(element)

    return buf.getvalue().rstrip(dialect.lineterminator)


class _DictToCSVFn(DoFn):
    """ Converts a Dictionary to a CSV-formatted String

        column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
        missing_val: The value to be written when a named field from `column_order` is not found in the input element
        discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
        dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect

    """

    def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
        self._column_order = column_order
        self._missing_val = missing_val
        self._discard_extras = discard_extras
        self._dialect = dialect

    def process(self, element, *args, **kwargs):
        result = _dict_to_csv(element,
                              column_order=self._column_order,
                              missing_val=self._missing_val,
                              discard_extras=self._discard_extras,
                              dialect=self._dialect)

        return [result,]

class DictToCSV(PTransform):
    """ Transforms a PCollection of Dictionaries to a PCollection of CSV-formatted Strings

        column_order: A tuple or list specifying the name of fields to be formatted as csv, in order
        missing_val: The value to be written when a named field from `column_order` is not found in an input element
        discard_extras: (bool) Behavior when additional fields are found in the dictionary input element
        dialect: Delimiters, escape-characters, etc can be controlled by providing an instance of csv.Dialect

    """

    def __init__(self, column_order, missing_val='', discard_extras=True, dialect=excel):
        self._column_order = column_order
        self._missing_val = missing_val
        self._discard_extras = discard_extras
        self._dialect = dialect

    def expand(self, pcoll):
        return pcoll | ParDo(_DictToCSVFn(column_order=self._column_order,
                                          missing_val=self._missing_val,
                                          discard_extras=self._discard_extras,
                                          dialect=self._dialect)
                             )

To use the example, you would put your test_input into a PCollection, and apply the DictToCSV PTransform to the PCollection; you can take the resulting converted PCollection and use it as input for WriteToText. Note that you must provide a list or tuple of column names, via the column_order argument, corresponding to keys for your dictionary input elements; the resulting CSV-formatted string columns will be in the order of the column names provided. Also, the underlying implementation for the example does not support unicode.

要使用该示例,您可以将test_input放入PCollection中,并将DictToCSV PTransform应用于PCollection;您可以获取生成的转换PCollection并将其用作WriteToText的输入。请注意,您必须通过column_order参数提供列名或元组列,对应于字典输入元素的键;生成的CSV格式的字符串列将按提供的列名称的顺序排列。此外,该示例的底层实现不支持unicode。

#2


0  

Based on Andrew's suggestion, here is a ConvertDictToCSV function that I created:

基于Andrew的建议,这是我创建的ConvertDictToCSV函数:

def ConvertDictToCSV(input_dict, fieldnames, separator=",", quotechar='"'):
  value_list = []
  for field in fieldnames:
    if input_dict[field]:
      field_value = str(input_dict[field])
    else:
      field_value = ""
    if separator in field_value:
      field_value = quotechar + field_value + quotechar
    value_list.append(field_value)

  return separator.join(value_list)

This appears to be working well, but would certainly be safer to make use of csv.DictWriter if possible

这似乎运行良好,但如果可能的话,使用csv.DictWriter肯定会更安全