
时间:2022-05-28 15:34:53

We have a custom combine function (on beam sdk 2.0) in which the millions of objects get accumulated but they do NOT necessarily get reduced....that is, they sometimes get added to a List such that eventually, the List might get quite large (hundreds of megabytes, even gigabytes).

我们有一个自定义组合函数(在beam sdk 2.0上),其中数百万个对象被累积但是它们不一定会被减少....也就是说,它们有时被添加到List中,这样最终,List可能会变得相当大(几百兆,甚至几千兆字节)。

To minimize the problem of having to "pass around" these objects (during merging of accumulators) between nodes, we've created a SINGLE giant node (of 64 cores, tonnes of RAM).


So, in "theory", dataflow does not need to serialize the List object (and any of these big objects in the List) even during "merge accumulator" operations, since all the objects are on the same node. But, does dataflow still serialize even if all the objects of interest are on the same node or is it smart enough to know that an object is on the same node vs separate nodes?


Ideally, when objects are on same node, we can just pass around references to the objects (rather than serializing/deserializing the contents of these objects, which can be very very large.) (I understand, of course, than when dealing with multiple nodes, there's no choice but to serialize/deserialize since the data has to be passed around somehow; but within a node, is beam sdk 2.0 smart enough to not serialize/deserialize during these combine functions, group by's etc.?)

理想情况下,当对象在同一节点上时,我们可以只传递对象的引用(而不是序列化/反序列化这些对象的内容,这可能非常大。)(当然,我理解的是处理多个对象时)节点,除了序列化/反序列化之外别无选择,因为数据必须以某种方式传递;但在一个节点内,beam sdk 2.0是否足够聪明,在这些组合函数,分组等等中不能序列化/反序列化?)

1 个解决方案



The Dataflow service aggressively optimizes your pipeline to avoid needless serialization. The optimization you are interested in is fusion, described here in the Dataflow documentation. When data moves through a fused "stage" (a sequence of low-level instructions roughly corresponding to steps in your input pipeline), it is not serialized and deserialized.


However, if your CombineFn builds a list, and that list grows large, you should try to rephrase your pipeline to use a raw GroupByKey. Another important optimization is "combiner lifting" or "mapper-side combine" where your CombineFn is applied per-key locally prior to shuffling your data between machines, based on the assumption that the accumulator will be smaller than just a list of elements. So the whole list will be serialized, shuffled, and deserialized prior to completing the Combine transform. If, instead, you use a GroupByKey directly, your elements would be much more efficiently streamed, without serializing an entire list.


I should note that Beam's other runners also perform standard fusion optimization and others. These all generally come from functional programming work in the late 80s / early 90s and was applied to distributed data processing in FlumeJava, circa 2010, so it is a baseline expectation now.

我应该注意到Beam的其他跑步者也执行标准的融合优化等。这些都通常来自80年代末/ 90年代初期的函数式编程工作,并且应用于大约2010年的FlumeJava中的分布式数据处理,因此它现在是基线期望。



The Dataflow service aggressively optimizes your pipeline to avoid needless serialization. The optimization you are interested in is fusion, described here in the Dataflow documentation. When data moves through a fused "stage" (a sequence of low-level instructions roughly corresponding to steps in your input pipeline), it is not serialized and deserialized.


However, if your CombineFn builds a list, and that list grows large, you should try to rephrase your pipeline to use a raw GroupByKey. Another important optimization is "combiner lifting" or "mapper-side combine" where your CombineFn is applied per-key locally prior to shuffling your data between machines, based on the assumption that the accumulator will be smaller than just a list of elements. So the whole list will be serialized, shuffled, and deserialized prior to completing the Combine transform. If, instead, you use a GroupByKey directly, your elements would be much more efficiently streamed, without serializing an entire list.


I should note that Beam's other runners also perform standard fusion optimization and others. These all generally come from functional programming work in the late 80s / early 90s and was applied to distributed data processing in FlumeJava, circa 2010, so it is a baseline expectation now.

我应该注意到Beam的其他跑步者也执行标准的融合优化等。这些都通常来自80年代末/ 90年代初期的函数式编程工作,并且应用于大约2010年的FlumeJava中的分布式数据处理,因此它现在是基线期望。