apache_beam.transforms.util.Reshuffle()不适用于GCP Dataflow

时间:2022-02-17 15:18:20

I have upgraded to the latest apache_beam[gcp] package via pip install --upgrade apache_beam[gcp]. However, I noticed that Reshuffle() does not appear in the [gcp] distribution. Does this mean that I will not be able to use Reshuffle() in any dataflow pipelines? Is there any way around this? Or is it possible that the pip package is just not up to date and if Reshuffle() is in master on github then it will be available on dataflow?

我已经通过pip install升级到最新的apache_beam [gcp]包--upgrade apache_beam [gcp]。但是,我注意到Reshuffle()没有出现在[gcp]发行版中。这是否意味着我将无法在任何数据流管道中使用Reshuffle()?有没有办法解决?或者pip包是否可能不是最新的,如果Reshuffle()在github上是master,那么它将在数据流上可用吗?

Based on the response to this question I am trying to read data from BigQuery and then randomize the data before I write it to CSV's in a GCP storage bucket. I have noticed that my sharded .csv's that I am using to train my GCMLE model are not truly random. Within tensorflow I can randomize the batches, but that will only randomize the rows within each file that is built up in the queue and my issue is that currently the files being generated are biased in some way. If there are any suggestions for other ways to shuffle right before writing to CSV in dataflow that would be much appreciated.

基于对此问题的响应,我尝试从BigQuery读取数据,然后在将数据写入GCP存储桶中的CSV之前随机化数据。我注意到我用来训练我的GCMLE模型的分片.csv并不是真正随机的。在tensorflow中,我可以随机化批次,但这只会随机化队列中构建的每个文件中的行,而我的问题是当前生成的文件在某种程度上存在偏差。如果在数据流中写入CSV之前有任何关于其他方式进行随机播放的建议,那将非常感激。

1 个解决方案

#1


3  

One approach is to recreate shuffle myself.

一种方法是自己重新创建洗牌。

import random

shuffled_data = (unshuffled_pcoll
        | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
        | 'GroupByKey' >> GroupByKey()
        | 'RemoveRandomKeys' >> FlatMap(lambda t: t[1]))

My remaining question would be if I need to worry about the windowing or ExpandIterable sections from the code

我剩下的问题是,如果我需要担心代码中的窗口或ExpandIterable部分

#1


3  

One approach is to recreate shuffle myself.

一种方法是自己重新创建洗牌。

import random

shuffled_data = (unshuffled_pcoll
        | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
        | 'GroupByKey' >> GroupByKey()
        | 'RemoveRandomKeys' >> FlatMap(lambda t: t[1]))

My remaining question would be if I need to worry about the windowing or ExpandIterable sections from the code

我剩下的问题是,如果我需要担心代码中的窗口或ExpandIterable部分