是否有任何形式可以写入BigQuery动态指定目标表的名称?

时间:2022-10-12 19:16:33

Is there any form to write to BigQuery specifying the name of destination tables dynamically?

是否有任何形式可以写入BigQuery动态指定目标表的名称?

Now I have:

我现在有:

bigQueryRQ
.apply(BigQueryIO.Write
    .named("Write")
    .to("project_name:dataset_name.table_name")
    .withSchema(Table.create_auditedTableSchema())
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

But I need the "table_name" as a dynamic table name that depends on the "tablerow" data that I want to write.

但是我需要“table_name”作为动态表名,它取决于我想写的“tablerow”数据。

2 个解决方案

#1


3  

Unfortunately, we don't provide an API to name the BigQuery table in a data-dependent way. Generally speaking, data-dependent BigQuery table destination(s) may be error prone.

不幸的是,我们没有提供API来以数据依赖的方式命名BigQuery表。一般而言,依赖于数据的BigQuery表目标可能容易出错。

That said, we are working on improving flexibility in this area. No estimates at this time, but we hope to get this soon.

也就是说,我们正在努力提高这一领域的灵活性。目前还没有估计,但我们希望尽快得到这个。

#2


4  

I have the same problem. How about to group rows by tags, and apply BigQueryIO.Write for every group separately?

我也有同样的问题。如何按标签对行进行分组,并分别为每个组应用BigQueryIO.Write?

    public static class TagMarker extends DoFn<TableRow, TableRow> {

    private Map<String, TupleTag<TableRow>> tagMap;

    public TagMarker(Map<String, TupleTag<TableRow>> tagMap) {
        this.tagMap = tagMap;
    }

    @Override
    public void processElement(ProcessContext c) throws Exception {
        TableRow item = c.element();
        c.sideOutput(tagMap.get(getTagName(item)), item);
    }

    private String getTagName(TableRow row) {
        // There will be your logic of determinate table by row
        return "table" + ((String)row.get("msg")).substring(0, 1);
    }

}


private static class GbqWriter extends PTransform<PCollection<TableRow>, PDone> {

    @Override
    public PDone apply(PCollection<TableRow> input) {

        TupleTag<TableRow> mainTag = new TupleTag<TableRow>();
        TupleTag<TableRow> tag2 = new TupleTag<TableRow>();
        TupleTag<TableRow> tag3 = new TupleTag<TableRow>();

        Map<String, TupleTag<TableRow>> tagMap = new HashMap<String, TupleTag<TableRow>>();
        tagMap.put("table1", mainTag);
        tagMap.put("table2", tag2);
        tagMap.put("table3", tag3);

        List<TupleTag<?>> tags = new ArrayList<TupleTag<?>>();
        tags.add(tag2);
        tags.add(tag3);

        PCollectionTuple result = input.apply(
            ParDo.withOutputTags(mainTag, TupleTagList.of(tags)).of(new TagMarker(tagMap))
        );

        PDone done = null;
        for (String tableId : tagMap.keySet()) {
            done = writeToGbq(tableId, result.get(tagMap.get(tableId)).setCoder(TableRowJsonCoder.of()));
        }

        return done;
    }


    private PDone writeToGbq(String tableId, PCollection<TableRow> rows) {

        PDone done = rows
                .apply(BigQueryIO.Write.named("WriteToGbq")
                .to("<project>:<dataset>." + tableId)
                .withSchema(getSchema())
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        );

        return done;
    }

}

I am not sure about rewriting variable done. Is it correct? Can it brake rewriting to GBQ after fail.

我不确定重写变量。这是对的吗?失败后能不能重写GBQ。

And this way is suitable only if you know list of tables which we want write to before parsing rows.

只有在解析行之前知道要写入的表的列表时,这种方式才适用。

#1


3  

Unfortunately, we don't provide an API to name the BigQuery table in a data-dependent way. Generally speaking, data-dependent BigQuery table destination(s) may be error prone.

不幸的是,我们没有提供API来以数据依赖的方式命名BigQuery表。一般而言,依赖于数据的BigQuery表目标可能容易出错。

That said, we are working on improving flexibility in this area. No estimates at this time, but we hope to get this soon.

也就是说,我们正在努力提高这一领域的灵活性。目前还没有估计,但我们希望尽快得到这个。

#2


4  

I have the same problem. How about to group rows by tags, and apply BigQueryIO.Write for every group separately?

我也有同样的问题。如何按标签对行进行分组,并分别为每个组应用BigQueryIO.Write?

    public static class TagMarker extends DoFn<TableRow, TableRow> {

    private Map<String, TupleTag<TableRow>> tagMap;

    public TagMarker(Map<String, TupleTag<TableRow>> tagMap) {
        this.tagMap = tagMap;
    }

    @Override
    public void processElement(ProcessContext c) throws Exception {
        TableRow item = c.element();
        c.sideOutput(tagMap.get(getTagName(item)), item);
    }

    private String getTagName(TableRow row) {
        // There will be your logic of determinate table by row
        return "table" + ((String)row.get("msg")).substring(0, 1);
    }

}


private static class GbqWriter extends PTransform<PCollection<TableRow>, PDone> {

    @Override
    public PDone apply(PCollection<TableRow> input) {

        TupleTag<TableRow> mainTag = new TupleTag<TableRow>();
        TupleTag<TableRow> tag2 = new TupleTag<TableRow>();
        TupleTag<TableRow> tag3 = new TupleTag<TableRow>();

        Map<String, TupleTag<TableRow>> tagMap = new HashMap<String, TupleTag<TableRow>>();
        tagMap.put("table1", mainTag);
        tagMap.put("table2", tag2);
        tagMap.put("table3", tag3);

        List<TupleTag<?>> tags = new ArrayList<TupleTag<?>>();
        tags.add(tag2);
        tags.add(tag3);

        PCollectionTuple result = input.apply(
            ParDo.withOutputTags(mainTag, TupleTagList.of(tags)).of(new TagMarker(tagMap))
        );

        PDone done = null;
        for (String tableId : tagMap.keySet()) {
            done = writeToGbq(tableId, result.get(tagMap.get(tableId)).setCoder(TableRowJsonCoder.of()));
        }

        return done;
    }


    private PDone writeToGbq(String tableId, PCollection<TableRow> rows) {

        PDone done = rows
                .apply(BigQueryIO.Write.named("WriteToGbq")
                .to("<project>:<dataset>." + tableId)
                .withSchema(getSchema())
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
        );

        return done;
    }

}

I am not sure about rewriting variable done. Is it correct? Can it brake rewriting to GBQ after fail.

我不确定重写变量。这是对的吗?失败后能不能重写GBQ。

And this way is suitable only if you know list of tables which we want write to before parsing rows.

只有在解析行之前知道要写入的表的列表时,这种方式才适用。