DoFn中的com.google.cloud.dataflow.sdk.util.IllegalMutationException

时间:2022-08-28 15:26:54

I wrote a DoFn<String, Set<SectionBodyRecord>> that outputs a Set. When I execute my pipeline I get the exception

我写了一个输出Set的DoFn >。当我执行我的管道时,我得到了异常 ,set>

Caused by: com.google.cloud.dataflow.sdk.util.IllegalMutationException: DoFn UnmarshalGcsPath mutated value
    [SectionBodyRecord{txId='3UR93528NX413902J'},
    SectionBodyRecord{txId='15N97640P5806660M'},
    SectionBodyRecord{txId='7TG473112Y9407154'},
    SectionBodyRecord{txId='9A1906561E887050P'},
    SectionBodyRecord{txId='4FP63718R4365381L'}]
    after it was output (new value was
    [SectionBodyRecord{txId='9A1906561E887050P'},
    SectionBodyRecord{txId='7TG473112Y9407154'},
    SectionBodyRecord{txId='3UR93528NX413902J'},
    SectionBodyRecord{txId='4FP63718R4365381L'},
    SectionBodyRecord{txId='15N97640P5806660M'}]).
    Values must not be mutated in any way after being output.
        at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.verifyOutputUnmodified(ParDo.java:1344)
        at com.google.cloud.dataflow.sdk.transforms.ParDo$ImmutabilityCheckingOutputManager.output(ParDo.java:1306)
        at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
        at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:450)
        at my.tests.pipelines.CsvToDatastore$UnmarshalGcsPath.processElement(CsvToDatastore.java:185)

I do not understand where the problem could be. Moreover, I have tried with

我不明白问题出在哪里。而且,我试过了

ImmutableSet<SectionBodyRecord> irecs = ImmutableSet.copyOf(records);
c.output(irecs);

but the problem remains.

但问题仍然存在。

Any suggestions?

有什么建议么?

Thanks in advance, Michael

先谢谢你,迈克尔

1 个解决方案

#1


0  

I guess you use a HashSet in the background, and you haven't (re)implemented a hashCode() function for SectionBodyRecord - at least that was my problem.

我猜你在后台使用HashSet,你没有(重新)为SectionBodyRecord实现一个hashCode()函数 - 至少这是我的问题。

You can either define hashCode() like int hashCode() { return txId.hashCode(); }, or use a TreeSet instead and implement the Comparable<SectionBodyRecord> interface.

您可以定义hashCode(),如int hashCode(){return txId.hashCode();或者使用TreeSet来实现Comparable 接口。

The reason why you get the error is that the data contained in the set changes positions every time you create the set, because there is no deterministic way to nail down the order (in case of TreeSet) or position (in case of HashSet) of your SectionBodyRecord elements.

您得到错误的原因是每次创建集合时集合中包含的数据都会更改位置,因为没有确定性的方法来确定顺序(在TreeSet的情况下)或位置(在HashSet的情况下)你的SectionBodyRecord元素。

#1


0  

I guess you use a HashSet in the background, and you haven't (re)implemented a hashCode() function for SectionBodyRecord - at least that was my problem.

我猜你在后台使用HashSet,你没有(重新)为SectionBodyRecord实现一个hashCode()函数 - 至少这是我的问题。

You can either define hashCode() like int hashCode() { return txId.hashCode(); }, or use a TreeSet instead and implement the Comparable<SectionBodyRecord> interface.

您可以定义hashCode(),如int hashCode(){return txId.hashCode();或者使用TreeSet来实现Comparable 接口。

The reason why you get the error is that the data contained in the set changes positions every time you create the set, because there is no deterministic way to nail down the order (in case of TreeSet) or position (in case of HashSet) of your SectionBodyRecord elements.

您得到错误的原因是每次创建集合时集合中包含的数据都会更改位置,因为没有确定性的方法来确定顺序(在TreeSet的情况下)或位置(在HashSet的情况下)你的SectionBodyRecord元素。