在Google Cloud Dataflow Python中使用CombinePerKey

时间:2022-02-09 15:22:52

I'm trying to run a simple Dataflow Python pipeline that gets certain user events from BigQuery and produces a per-user event count.

我正在尝试运行一个简单的Dataflow Python管道,该管道从BigQuery获取某些用户事件并生成每用户事件计数。

p = df.Pipeline(argv=pipeline_args)
result_query = "..."
data = p | df.io.Read(df.io.BigQuerySource(query=result_query))
user_events = data|df.Map(lambda x: (x['users_user_id'], 1))
user_event_counts = user_events|df.CombinePerKey(sum)

Running this gives me an error:

运行这个给我一个错误:

TypeError: Expected tuple, got int [while running 'Map(<lambda at user_stats.py:...>)']

Data before the CombinePerKey transform is in this form:

CombinePerKey变换之前的数据采用以下形式:

(u'55107178236374', 1)
(u'55107178236374', 1)
(u'55107178236374', 1)
(u'2296845644499670', 1)
(u'2296845644499670', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)

If instead calculate user_event_counts with this:

相反,如果计算user_event_counts:

user_event_counts = (user_events|df.GroupByKey()|
    df.Map('count', lambda (user, ones): (user, sum(ones))))

then there are no errors and I get the result I expect.

然后没有错误,我得到了我期望的结果。

Based the docs I would have expected similar behaviour from both approaches. I obviously missing something with respect to CombinePerKey but I can't see what it is. Any tips appreciated!

基于这些文档,我预计两种方法都会有类似的行为。我显然错过了关于CombinePerKey的一些东西,但我看不出它是什么。任何提示赞赏!

1 个解决方案

#1


1  

I am guessing you run a version of the SDK lower than 0.2.4. This is a bug in how we handle combining operations in some scenarios. The issue is fixed with the latest release of the SDK (v0.2.4): https://github.com/GoogleCloudPlatform/DataflowPythonSDK/releases/tag/v0.2.4 Sorry about that. Let us know if you still experience the issue with the latest release.

我猜你运行的SDK版本低于0.2.4。这是我们在某些情况下处理组合操作的错误。最新版本的SDK(v0.2.4)修复了该问题:https://github.com/GoogleCloudPlatform/DataflowPythonSDK/releases/tag/v0.2.4对此抱歉。如果您仍然遇到最新版本的问题,请告诉我们。

#1


1  

I am guessing you run a version of the SDK lower than 0.2.4. This is a bug in how we handle combining operations in some scenarios. The issue is fixed with the latest release of the SDK (v0.2.4): https://github.com/GoogleCloudPlatform/DataflowPythonSDK/releases/tag/v0.2.4 Sorry about that. Let us know if you still experience the issue with the latest release.

我猜你运行的SDK版本低于0.2.4。这是我们在某些情况下处理组合操作的错误。最新版本的SDK(v0.2.4)修复了该问题:https://github.com/GoogleCloudPlatform/DataflowPythonSDK/releases/tag/v0.2.4对此抱歉。如果您仍然遇到最新版本的问题,请告诉我们。