使用Google Cloud Dataflow创建大型CSV数据

时间:2022-01-30 15:33:34

I need to create a large csv file of ~ 2 billion records with header. It takes a long time to create using standalone script however since records are not related , I understand Cloud dataflow can make it distributed spinning up multiple worker GCE machines of my choice. Does cloud dataflow always need to have an input. Here I am trying to programmatically generate data of following format

我需要用标题创建一个大约20亿条记录的大型csv文件。使用独立脚本创建需要很长时间,但由于记录不相关,我了解云数据流可以使其分布式旋转我选择的多个工作者GCE机器。云数据流是否总是需要输入。在这里,我试图以编程方式生成以下格式的数据

ItemId,   ItemQuantity, ItemPrice, Salevalue, SaleDate
item0001, 25          , 100      , 2500      , 2017-03-18
item0002, 50          , 200      , 10000     , 2017-03-25 

Note

注意

ItemId can be postfixed with any random number between 0001 to 9999 ItemQuantity can be random value between (1 to 1000) ItemPrice can be random value between (1 to 100) SaleValue = ItemQuantity*ItemPrice Date between 2015-01-01 to 2017-12-31

ItemId可以带有0001到9999之间的任意随机数.ItemQuantity可以是(1到1000)之间的随机值ItemPrice可以是(1到100)之间的随机值SaleValue = ItemQuantity * ItemPrice日期2015-01-01到2017-12 -31

Any language is fine.

任何语言都可以。

Continueing from question Generating large file using Google Cloud Dataflow

继续使用Google Cloud Dataflow生成大文件

1 个解决方案

#1


1  

Currently, there is not a very elegant way of doing this. In Python you would do this (same thing for Java, just the syntax changes):

目前,没有一种非常优雅的方式来做到这一点。在Python中你会这样做(对于Java来说,只是语法更改):

def generate_keys():
  for i in range(2000):
    # Generate 2000 key-value pairs to shuffle
    yield (i, 0)

def generate_random_elements():
  for i in range(1000000):
    yield random_element()

p = beam.Pipeline(my_options)
(p 
 | beam.Create(['any']) 
 | beam.FlatMap(generate_keys)
 | beam.GroupByKey()
 | beam.FlatMap(generate_random_elements)
 | beam.WriteToText('gs://bucket-name/file-prefix'))

In generate_keys() we are generating 2000 different keys, and then we run GroupByKey so that they will be shuffled to different workers. We need to do this, because the DoFn can not currently be split across several workers. (Once SplittableDoFn is implemented, this will be much easier).

在generate_keys()中,我们生成了2000个不同的密钥,然后我们运行GroupByKey,以便它们将被洗牌到不同的工作者。我们需要这样做,因为DoFn目前不能分成几个工人。 (一旦实现SplittableDoFn,这将更容易)。

As a note, when Dataflow writes results out to sinks, it commonly separates them into different files (e.g. gs://bucket-name/file-prefix-0000-00001, and so on), so you'll need to condense the files together.

需要注意的是,当Dataflow将结果写入接收器时,它通常将它们分成不同的文件(例如gs:// bucket-name / file-prefix-0000-00001,依此类推),因此您需要压缩文件在一起。

Also, you can use --num_workers 10, or however many to spawn in Dataflow, or use autoscaling.

此外,您可以使用--num_workers 10,或者在Dataflow中生成多个,或使用自动缩放。

#1


1  

Currently, there is not a very elegant way of doing this. In Python you would do this (same thing for Java, just the syntax changes):

目前,没有一种非常优雅的方式来做到这一点。在Python中你会这样做(对于Java来说,只是语法更改):

def generate_keys():
  for i in range(2000):
    # Generate 2000 key-value pairs to shuffle
    yield (i, 0)

def generate_random_elements():
  for i in range(1000000):
    yield random_element()

p = beam.Pipeline(my_options)
(p 
 | beam.Create(['any']) 
 | beam.FlatMap(generate_keys)
 | beam.GroupByKey()
 | beam.FlatMap(generate_random_elements)
 | beam.WriteToText('gs://bucket-name/file-prefix'))

In generate_keys() we are generating 2000 different keys, and then we run GroupByKey so that they will be shuffled to different workers. We need to do this, because the DoFn can not currently be split across several workers. (Once SplittableDoFn is implemented, this will be much easier).

在generate_keys()中,我们生成了2000个不同的密钥,然后我们运行GroupByKey,以便它们将被洗牌到不同的工作者。我们需要这样做,因为DoFn目前不能分成几个工人。 (一旦实现SplittableDoFn,这将更容易)。

As a note, when Dataflow writes results out to sinks, it commonly separates them into different files (e.g. gs://bucket-name/file-prefix-0000-00001, and so on), so you'll need to condense the files together.

需要注意的是,当Dataflow将结果写入接收器时,它通常将它们分成不同的文件(例如gs:// bucket-name / file-prefix-0000-00001,依此类推),因此您需要压缩文件在一起。

Also, you can use --num_workers 10, or however many to spawn in Dataflow, or use autoscaling.

此外,您可以使用--num_workers 10,或者在Dataflow中生成多个,或使用自动缩放。