Apache Beam / Google数据流 - 错误处理

时间:2023-01-25 15:34:48

I have a pipeline with quite a few steps (just above 15). I want to report failures everytime a DoFn fails. I started implementing it through TupleTags with code such as :


try {
 ... do stuff ...
 c.output(successTag, ...);
} catch (Exception e) {
 c.output(failureTag, new Failure(...));

But since my pipeline contains a lot of steps, this make the pipeline definition code quite hard to read / maintain.


Is there a more global way to achieve it ? Something like raising a custom exception which is handled globally at the pipeline level ?


1 个解决方案



What you are doing is the correct approach to catch errors and output them differently. You will need this on each step though. You could use a java pattern to reuse it if you prefer. Create a base class for all your ParDos and in processElement add the exception handling code. Then implement your processElement in a separate function (i.e. processElementImpl) which you call in processElement.




What you are doing is the correct approach to catch errors and output them differently. You will need this on each step though. You could use a java pattern to reuse it if you prefer. Create a base class for all your ParDos and in processElement add the exception handling code. Then implement your processElement in a separate function (i.e. processElementImpl) which you call in processElement.
