什么是数据流中的python zip函数的等价物?

时间:2021-07-12 15:34:26

I'm using the python apache_beam version of dataflow. I have about 300 files with an array of 4 million entries each. The whole thing is about 5Gb, stored on a gs bucket.

我正在使用python apache_beam版本的数据流。我有大约300个文件,每个文件包含400万个条目。整个事情大约是5Gb,存储在一个gs桶上。

I can easily produce a PCollection of the arrays {x_1, ... x_n} by reading each file, but the operation I now need to perform is like the python zip function: I want a PCollection ranging from 0 to n-1, where each element i contains the array of all the x_i across the files. I tried yielding (i, element) for each element and then running GroupByKey, but that is much too slow and inefficient (it won't run at all locally because of memory constraints, and it took 24 hours on the cloud, whereas I'm sure I can at least load all of the dataset if I want).

我可以通过读取每个文件轻松生成数组{x_1,... x_n}的PCollection,但我现在需要执行的操作就像python zip函数:我想要一个范围从0到n-1的PCollection,其中每个元素我都包含文件中所有x_i的数组。我尝试为每个元素提供(i,element)然后运行GroupByKey,但这太慢而且效率低(由于内存限制,它不会在本地运行,并且在云上花了24小时,而我'我确定如果我愿意,我至少可以加载所有数据集。

How do I restructure the pipeline to do this cleanly?

我如何重组管道以干净利落地完成这项工作?

1 个解决方案

#1


0  

As jkff pointed out in the above comment, the code is indeed correct and the procedure is the recommended way of programming a tensorflow algorithm. The DoFn applied to each element was the bottleneck.

正如jkff在上面的注释中指出的那样,代码确实是正确的,并且该过程是编程张量流算法的推荐方法。应用于每个元素的DoFn是瓶颈。

#1


0  

As jkff pointed out in the above comment, the code is indeed correct and the procedure is the recommended way of programming a tensorflow algorithm. The DoFn applied to each element was the bottleneck.

正如jkff在上面的注释中指出的那样,代码确实是正确的,并且该过程是编程张量流算法的推荐方法。应用于每个元素的DoFn是瓶颈。