I'm trying to run a simple Dataflow Python pipeline that gets certain user events from BigQuery and produces a per-user event count.
我正在尝试运行一个简单的Dataflow Python管道,该管道从BigQuery获取某些用户事件并生成每用户事件计数。
p = df.Pipeline(argv=pipeline_args)
result_query = "..."
data = p | df.io.Read(df.io.BigQuerySource(query=result_query))
user_events = data|df.Map(lambda x: (x['users_user_id'], 1))
user_event_counts = user_events|df.CombinePerKey(sum)
Running this gives me an error:
运行这个给我一个错误:
TypeError: Expected tuple, got int [while running 'Map(<lambda at user_stats.py:...>)']
Data before the CombinePerKey
transform is in this form:
CombinePerKey变换之前的数据采用以下形式:
(u'55107178236374', 1)
(u'55107178236374', 1)
(u'55107178236374', 1)
(u'2296845644499670', 1)
(u'2296845644499670', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
If instead calculate user_event_counts
with this:
相反,如果计算user_event_counts:
user_event_counts = (user_events|df.GroupByKey()|
df.Map('count', lambda (user, ones): (user, sum(ones))))
then there are no errors and I get the result I expect.
然后没有错误,我得到了我期望的结果。
Based the docs I would have expected similar behaviour from both approaches. I obviously missing something with respect to CombinePerKey
but I can't see what it is. Any tips appreciated!
基于这些文档,我预计两种方法都会有类似的行为。我显然错过了关于CombinePerKey的一些东西,但我看不出它是什么。任何提示赞赏!
1 个解决方案
#1
1
I am guessing you run a version of the SDK lower than 0.2.4. This is a bug in how we handle combining operations in some scenarios. The issue is fixed with the latest release of the SDK (v0.2.4): https://github.com/GoogleCloudPlatform/DataflowPythonSDK/releases/tag/v0.2.4 Sorry about that. Let us know if you still experience the issue with the latest release.
我猜你运行的SDK版本低于0.2.4。这是我们在某些情况下处理组合操作的错误。最新版本的SDK(v0.2.4)修复了该问题:https://github.com/GoogleCloudPlatform/DataflowPythonSDK/releases/tag/v0.2.4对此抱歉。如果您仍然遇到最新版本的问题,请告诉我们。
#1
1
I am guessing you run a version of the SDK lower than 0.2.4. This is a bug in how we handle combining operations in some scenarios. The issue is fixed with the latest release of the SDK (v0.2.4): https://github.com/GoogleCloudPlatform/DataflowPythonSDK/releases/tag/v0.2.4 Sorry about that. Let us know if you still experience the issue with the latest release.
我猜你运行的SDK版本低于0.2.4。这是我们在某些情况下处理组合操作的错误。最新版本的SDK(v0.2.4)修复了该问题:https://github.com/GoogleCloudPlatform/DataflowPythonSDK/releases/tag/v0.2.4对此抱歉。如果您仍然遇到最新版本的问题,请告诉我们。