Google Dataflow / Apache Beam Python - 来自PCollection的Side-Input会导致性能下降

时间:2021-01-10 15:32:22

We are running logfile parsing jobs in google dataflow using the Python SDK. Data is spread over several 100s of daily logs, which we read via file-pattern from Cloud Storage. Data volume for all files is about 5-8 GB (gz files) with 50-80 million lines in total.

我们使用Python SDK在google dataflow中运行日志文件解析作业。数据分布在几百个日常日志中,我们通过云存储中的文件模式读取这些日志。所有文件的数据量约为5-8 GB(gz文件),总共有5千万到8千万行。

loglines = p | ReadFromText('gs://logfile-location/logs*-20180101')

In addition, we have a simple (small) mapping csv, that maps logfile-entries to human readable text. Has about 400 lines, 5 kb size.

此外,我们有一个简单的(小)映射csv,它将logfile条目映射到人类可读的文本。有大约400行,5 kb大小。

For Example a logfile entry with [param=testing2] should be mapped to "Customer requested 14day free product trial" in the final output.

例如,带有[param = testing2]的日志文件条目应映射到最终输出中的“客户请求的14天免费产品试用”。

We do this in a simple beam.Map with sideinput, like so:


customerActions = loglines | beam.Map(map_logentries,mappingTable)

where map_logentries is the mapping function and mappingTable is said mapping table.


However, this only works if we read the mapping table in native python via open() / read(). If we do the same utilising the beam pipeline via ReadFromText() and pass the resulting PCollection as side-input to the Map, like so:

但是,这只有在我们通过open()/ read()读取本机python中的映射表时才有效。如果我们通过ReadFromText()使用波束管道并将生成的PCollection作为侧输入传递给Map,如下所示:

mappingTable = p | ReadFromText('gs://side-inputs/category-mapping.csv')    
customerActions = loglines | beam.Map(map_logentries,beam.pvalue.AsIter(mappingTable))

performance breaks down completely to about 2-3 items per Second.


Now, my questions:


  1. Why would performance break so badly, what is wrong with passing a PCollection as side-input?
  2. 为什么性能会如此糟糕地破坏,将PCollection作为侧输入传递会出现什么问题呢?
  3. If it is maybe not recommended to use PCollections as side-input, how is one supposed to build such as pipeline that needs mappings that can/should not be hard coded into the mapping function?
  4. 如果可能不建议使用PCollections作为侧输入,那么应该如何构建诸如需要可以/不应该硬编码到映射函数中的映射的管道?

For us, the mapping does change frequently and I need to find a way to have "normal" users provide it. The idea was to have the mapping csv available in Cloud Storage, and simply incorporate it into the Pipeline via ReadFromText(). Reading it locally involves providing the mapping to the workers, so only the tech-team can do this.


I am aware that there are caching issues with side-input, but surely this should not apply to a 5kb input.


All code above is pseudo code to explain the problem. Any ideas and thoughts on this would be highly appreciated!


2 个解决方案



For more efficient side inputs (with small to medium size) you can utilize beam.pvalue.AsList(mappingTable) since AsList causes Beam to materialize the data, so you're sure that you will get in-memory list for that pcollection.


Intended for use in side-argument specification---the same places where AsSingleton and AsIter are used, but forces materialization of this PCollection as a list.






  1. The code looks fine. However, since mappingTable is a mapping, wouldn't beam.pvalue.AsDict be more appropriate for your use case?


  2. Your mappingTable is small enough so side input is a good use case here. Given that mappingTable is also static, you can load it from GCS in start_bundle function of your DoFn. See the answer to this post for more details. If mappingTable becomes very large in future, you can also consider converting your map_logentries and mappingTable into PCollection of key-value pairs and join them using CoGroupByKey.




For more efficient side inputs (with small to medium size) you can utilize beam.pvalue.AsList(mappingTable) since AsList causes Beam to materialize the data, so you're sure that you will get in-memory list for that pcollection.


Intended for use in side-argument specification---the same places where AsSingleton and AsIter are used, but forces materialization of this PCollection as a list.






  1. The code looks fine. However, since mappingTable is a mapping, wouldn't beam.pvalue.AsDict be more appropriate for your use case?


  2. Your mappingTable is small enough so side input is a good use case here. Given that mappingTable is also static, you can load it from GCS in start_bundle function of your DoFn. See the answer to this post for more details. If mappingTable becomes very large in future, you can also consider converting your map_logentries and mappingTable into PCollection of key-value pairs and join them using CoGroupByKey.
