Sometimes I am getting below exception while running streaming dataflow :
有时我在运行流数据流时遇到异常:
exception: "java.lang.RuntimeException: Exception while fetching side input:
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher.fetchSideInput(StateFetcher.java:184)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:175)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext.access$400(StreamingModeExecutionContext.java:56)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:401)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:135)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:175)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:117)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.startBundle(ForwardingParDoFn.java:36)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.start(ParDoOperation.java:45)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:719)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.access$600(StreamingDataflowWorker.java:95)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:538)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Duplicate values for 2059
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2207)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4790)
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher.fetchSideInput(StateFetcher.java:175)
... 16 more
Caused by: java.lang.IllegalArgumentException: Duplicate values for 2059
at com.google.cloud.dataflow.sdk.util.PCollectionViews$MapPCollectionView.fromElements(PCollectionViews.java:291)
at com.google.cloud.dataflow.sdk.util.PCollectionViews$MapPCollectionView.fromElements(PCollectionViews.java:273)
at com.google.cloud.dataflow.sdk.util.PCollectionViews$PCollectionViewBase.fromIterableInternal(PCollectionViews.java:368)
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher$2.call(StateFetcher.java:152)
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher$2.call(StateFetcher.java:104)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4793)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
... 19 more
Dataflow worker machine type : n1-standard-4
数据流工作者机器类型:n1-standard-4
Worker cache memory Mb : 2048
工作缓存内存Mb:2048
Dataflow main Input : PubSub subscription. I am creating sideInput from BT , and passing this sideInputs to multiple transformations.The size of my sideinput is less that 100Mb.
数据流主要输入:PubSub订阅。我正在从BT创建sideInput,并将此sideInput传递给多个转换。我的sideinput的大小小于100Mb。
Thanks.
谢谢。
1 个解决方案
#1
0
That error indicates that multiple values with the same key (2059) have been encountered, which violates the expectations for a Map-valued side input. This can happen in streaming especially if you trigger the same value multiple times. If you instead use a Multimap, it should allow you to retrieve all of the values associated with a given key.
该错误表示遇到了具有相同键(2059)的多个值,这违反了对Map值侧输入的期望。这可能发生在流式传输中,尤其是如果您多次触发相同的值。如果您使用Multimap,它应该允许您检索与给定键关联的所有值。
#1
0
That error indicates that multiple values with the same key (2059) have been encountered, which violates the expectations for a Map-valued side input. This can happen in streaming especially if you trigger the same value multiple times. If you instead use a Multimap, it should allow you to retrieve all of the values associated with a given key.
该错误表示遇到了具有相同键(2059)的多个值,这违反了对Map值侧输入的期望。这可能发生在流式传输中,尤其是如果您多次触发相同的值。如果您使用Multimap,它应该允许您检索与给定键关联的所有值。