gcp dataflow process元素不会转到下一个ParDO函数

时间:2021-07-12 15:34:14

*I fill the map with good data (no null values) but i am not able to go next ParDo function .i tried to debug but didn't understand why is it happening . if anyone is knows what i am doing wrong let me know .I am putting the three ParDo functions .thanks *

*我用良好的数据填充地图(没有空值)但我无法进入下一个ParDo功能。我试图调试,但不明白为什么会发生这种情况。如果有人知道我做错了什么让我知道。我正在使用三个ParDo函数。谢谢*

.apply("Parse XML CarrierManifest ", ParDo.of(new DoFn<String, Manifest>() {
                    @ProcessElement
                    public void processeElement(ProcessContext c) {
                        try {

                            System.out.println(c.element());
                            JAXBContext jaxbContext = JAXBContext.newInstance(Manifest.class);
                            Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
                            StringReader reader = new StringReader(c.element());

                            Manifest manifest = (Manifest) unmarshaller.unmarshal(reader);

                            if (manifest == null) throw new RuntimeException("Invalid data");

                            c.output(manifest);

                        }

                        catch (Exception e)
                        {
                            LOG.error("Unexpected error while parsing input. File was <[ " + c.element() + " ]>", e);
                        }

                    }

                }
                )
                )

//---------------------------------------------------------------------------------------------------------------

        .apply("preparing  data  " , ParDo.of(new DoFn<Manifest,  Map<String, List<TableRow>>>()
                   {
                       @ProcessElement
                       public void processeElement(ProcessContext c)
                       {
                           Map<String, List<TableRow>> RowsTable = new ArrayMap<>();
                           RowsTable.put("Manifest",new ArrayList<>());

                           Manifest manifest = c.element();

                           Links linkss = manifest.linkes;

                           System.out.println(linkss.ShipmentsList.linakageShipment.linkageesList.size());

                           for (int i = 0; i < linkss.ShipmentsList.linakageShipment.linkageesList.size(); i++) {

                               RowsTable.get("Manifest")
                                     .add(new TableRow()
                                       .set("GROUP_ID", manifest.GroupidValue)
                                       .set("STATUS", manifest.StatusValue)
                                       .set("GROUP_TYPE", manifest.typeValue)
                                       .set("CREATED_AT", manifest.created_atValue)
                                       .set("READY_AT", manifest.ready_atValue)
                                       .set("MANIFEST_NUMBER", manifest.manifest_numberValue)
                                       .set("LINKS_SELF", linkss.SelfLink)

                                       .set("SHIPMENT_ID", linkss.ShipmentsList.linakageShipment.linkageesList.get(i).ID)
                                       .set("SHIPMENT_TYPE", linkss.ShipmentsList.linakageShipment.linkageesList.get(i).Type));
                           }
                            c.output(RowsTable);
                       }

                   }))
//---------------------------------------------------------------------------------------------------------------

               .apply("change rows list to one row ",ParDo.of(new DoFn<Map<String, List<TableRow>>, TableRow>()
               {
                   @ProcessElement
                   public void processElement(ProcessContext c)
                   {
                       System.out.println("id: " + c.element());
                       for (TableRow r : c.element().get("Manifest")) // Should only have 1
                           c.output(r);
                   }
               }))

1 个解决方案

#1


0  

From one of your comments in your question I understand that the issue is that your Dataflow pipeline only works when run in Dataflow itself (with the Dataflow Runner), but it does not work locally when you use the Direct Runner instead.

从您在问题中的一条评论中我了解到,问题是您的Dataflow管道仅在Dataflow本身(使用Dataflow Runner)中运行时才有效,但是当您使用Direct Runner时它不能在本地运行。

As explained in the documentation for Apache Beam's Direct Runner, local execution is limited by the memory available locally, and it is recommended that the debugging process is done with small data sets that can be handled by your local machine. In any case, from that same comment I understand that you pipeline works well when executed in Dataflow, and therefore there is no issue with the Pipeline itself.

正如Apache Beam的Direct Runner文档中所解释的那样,本地执行受到本地可用内存的限制,建议使用可由本地计算机处理的小数据集完成调试过程。在任何情况下,从同一个评论我都知道你在数据流中执行时管道运行良好,因此管道本身没有问题。

As per the description you provided, it definitely looks like the issue is related to the limitations of Direct Runner, but if you have more specific errors in your local/remote environment, you should be more specific in the question description and answer to the comments that request more information about your use issue in order to be able to assist you.

根据您提供的描述,看起来问题肯定与Direct Runner的限制有关,但如果您在本地/远程环境中有更多特定错误,您应该在问题描述中更具体,并回答评论请求有关您的使用问题的更多信息,以便能够为您提供帮助。

#1


0  

From one of your comments in your question I understand that the issue is that your Dataflow pipeline only works when run in Dataflow itself (with the Dataflow Runner), but it does not work locally when you use the Direct Runner instead.

从您在问题中的一条评论中我了解到,问题是您的Dataflow管道仅在Dataflow本身(使用Dataflow Runner)中运行时才有效,但是当您使用Direct Runner时它不能在本地运行。

As explained in the documentation for Apache Beam's Direct Runner, local execution is limited by the memory available locally, and it is recommended that the debugging process is done with small data sets that can be handled by your local machine. In any case, from that same comment I understand that you pipeline works well when executed in Dataflow, and therefore there is no issue with the Pipeline itself.

正如Apache Beam的Direct Runner文档中所解释的那样,本地执行受到本地可用内存的限制,建议使用可由本地计算机处理的小数据集完成调试过程。在任何情况下,从同一个评论我都知道你在数据流中执行时管道运行良好,因此管道本身没有问题。

As per the description you provided, it definitely looks like the issue is related to the limitations of Direct Runner, but if you have more specific errors in your local/remote environment, you should be more specific in the question description and answer to the comments that request more information about your use issue in order to be able to assist you.

根据您提供的描述,看起来问题肯定与Direct Runner的限制有关,但如果您在本地/远程环境中有更多特定错误,您应该在问题描述中更具体,并回答评论请求有关您的使用问题的更多信息,以便能够为您提供帮助。