将数据流中的STRUCT数组写入大查询

时间:2021-03-27 15:35:31

I am trying to write an Array of Structs field from my Dataflow pipeline to big query, the schema of the table generated is correct but no data gets populated in the fields.

我正在尝试从我的Dataflow管道向大查询编写一个Structs数组字段,生成的表的模式是正确的,但字段中没有填充数据。

My DoFn function:

我的DoFn功能:

public class ProcessIpBlocks {

    public static class IpBlocksToIp extends DoFn<TableRow, TableRow> {

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(ProcessContext c) throws JSONException {

            TableRow row = c.element();
            DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Calendar cal = Calendar.getInstance();


            long startIp = 0L, endIp = 0L;
            if(row.get("start_ip") != null)
                startIp = Long.parseLong((String)row.get("start_ip"));

            if(row.get("end_ip") != null)
                endIp = Long.parseLong((String)row.get("end_ip"));

            for(long i= startIp; i<=endIp; i++)
            {
                TableRow outputRow = new TableRow();
                outputRow.set("start_ip", startIp);
                outputRow.set("ip", i);

                if(row.get("postal_code") != null && !((String)row.get("postal_code")).isEmpty()){

                    System.out.println("This is getting written to logs");
                    endIp = Long.parseLong((String)row.get("end_ip"));
                    JSONArray atrArray = new JSONArray();

                    JSONObject atr = new JSONObject();
                    atr.put("id", "zippostal_code");

                    JSONArray atrValueArray = new JSONArray();
                    atr.put("value", atrValueArray.put((String)row.get("postal_code")));


                    atr.put("pr", 0.5);
                    atr.put("dt", cal.getTime());
                    atrArray.put(atr);
                    outputRow.set("atr", atrArray);
                }

                c.output(outputRow);
            }
        }
    }

}

My pipeline write step:

我的管道编写步骤:

iPBlocksToIPData.apply("Foo", ParDo.of(new ProcessIpBlocks.IpBlocksToIp()))
        .apply(BigQueryIO.Write
                .named("WriteIPs")
                .to(String.format("%1$s:%2$s.%3$s",projectId, eventDataset, ipBlocksToIpTable))
                .withSchema(schema)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

1 个解决方案

#1


3  

Below solution worked, using TableRow instead of JSONArray

下面的解决方案工作,使用TableRow而不是JSONArray

public class Foo {

公共课Foo {

public static class Foo extends DoFn<TableRow, TableRow> {


    @Override
    public void processElement(ProcessContext c) throws JSONException {

        TableRow row = c.element();
        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Calendar cal = Calendar.getInstance();


        long startIp = 0L, endIp = 0L;
        if(row.get("start_ip") != null)
            startIp = Long.parseLong((String)row.get("start_ip"));

        if(row.get("end_ip") != null)
            endIp = Long.parseLong((String)row.get("end_ip"));

        for(long i= startIp; i<=endIp; i++)
        {
            TableRow outputRow = new TableRow();
            outputRow.set("start_ip", startIp);
            outputRow.set("ip", i);

            if(row.get("postal_code") != null && !((String)row.get("postal_code")).isEmpty()){

                endIp = Long.parseLong((String)row.get("end_ip"));

                TableRow atrRow = new TableRow();
                atrRow.set("id", "zippostal_code");
                atrRow.set("value", new String[] {(String)row.get("postal_code")});



                outputRow.set("atr", atrRow);
            }

            System.out.println(outputRow);

            c.output(outputRow);
        }
    }
}

#1


3  

Below solution worked, using TableRow instead of JSONArray

下面的解决方案工作,使用TableRow而不是JSONArray

public class Foo {

公共课Foo {

public static class Foo extends DoFn<TableRow, TableRow> {


    @Override
    public void processElement(ProcessContext c) throws JSONException {

        TableRow row = c.element();
        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Calendar cal = Calendar.getInstance();


        long startIp = 0L, endIp = 0L;
        if(row.get("start_ip") != null)
            startIp = Long.parseLong((String)row.get("start_ip"));

        if(row.get("end_ip") != null)
            endIp = Long.parseLong((String)row.get("end_ip"));

        for(long i= startIp; i<=endIp; i++)
        {
            TableRow outputRow = new TableRow();
            outputRow.set("start_ip", startIp);
            outputRow.set("ip", i);

            if(row.get("postal_code") != null && !((String)row.get("postal_code")).isEmpty()){

                endIp = Long.parseLong((String)row.get("end_ip"));

                TableRow atrRow = new TableRow();
                atrRow.set("id", "zippostal_code");
                atrRow.set("value", new String[] {(String)row.get("postal_code")});



                outputRow.set("atr", atrRow);
            }

            System.out.println(outputRow);

            c.output(outputRow);
        }
    }
}