I'm currently writing a Java utility to import few CSV files from GCS into BigQuery. I can easily achieve this by bq load
, but I wanted to do it using a Dataflow job. So I'm using Dataflow's Pipeline and ParDo transformer (returns TableRow to apply it on the BigQueryIO) and I have created the StringToRowConverter() for the transformation. Here the actual problem starts - I am forced to specify the schema for the destination table although I don't want to create a new table if it doesn't exist - only trying to load data. So I do not want to manually set the column name for the TableRow as I have about 600 columns.
我目前正在编写一个Java实用程序,用于将少量CSV文件从GCS导入BigQuery。我可以通过bq load轻松实现这一点,但我想使用Dataflow作业来实现。所以我使用Dataflow的Pipeline和ParDo转换器(返回TableRow将它应用于BigQueryIO),我已经为转换创建了StringToRowConverter()。这里实际问题开始了 - 我*指定目标表的模式,虽然我不想创建一个新表,如果它不存在 - 只是尝试加载数据。所以我不想手动设置TableRow的列名,因为我有大约600列。
public class StringToRowConverter extends DoFn<String, TableRow> {
private static Logger logger = LoggerFactory.getLogger(StringToRowConverter.class);
public void processElement(ProcessContext c) {
TableRow row = new TableRow();
row.set("DO NOT KNOW THE COLUMN NAME", c.element());
c.output(row);
}
}
Moreover, it is assumed that the table already exists in the BigQuery dataset and I don't need to create it, and also the CSV file contains the columns in a correct order.
此外,假设该表已存在于BigQuery数据集中,我不需要创建它,并且CSV文件也包含正确顺序的列。
If there's no workaround to this scenario and the column name is needed for the data load, then I can have it in the first row of the CSV file.
如果此方案没有解决方法并且数据加载需要列名称,那么我可以将其放在CSV文件的第一行中。
Any help will be appreciated.
任何帮助将不胜感激。
1 个解决方案
#1
6
To avoid the creation of the table, you should use the BigQueryIO.Write.CreateDisposition.CREATE_NEVER of the BigQueryIO.Write during the pipeline configuration. Source: https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write
为避免创建表,应在管道配置期间使用BigQueryIO.Write的BigQueryIO.Write.CreateDisposition.CREATE_NEVER。资料来源:https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write
You don't need to know a BigQuery table schema upfront, you can discover it dynamically. For instance, you can use the BigQuery API (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get) to query a table schema and pass it as a parameter for class StringToRowConverter. Another option and assuming that first row is a header, is to skip the first row and use it to map the rest of the file correctly.
您不需要事先了解BigQuery表模式,您可以动态地发现它。例如,您可以使用BigQuery API(https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get)查询表架构并将其作为StringToRowConverter类的参数传递。另一个选项并假设第一行是标题,是跳过第一行并使用它来正确映射文件的其余部分。
The code below implements the 2nd approach and also configures the output to append to an existing BigQuery table.
下面的代码实现了第二种方法,并将输出配置为附加到现有的BigQuery表。
public class DFJob {
public static class StringToRowConverter extends DoFn<String, TableRow> {
private String[] columnNames;
private boolean isFirstRow = true;
public void processElement(ProcessContext c) {
TableRow row = new TableRow();
String[] parts = c.element().split(",");
if (isFirstRow) {
columnNames = Arrays.copyOf(parts, parts.length);
isFirstRow = false;
} else {
for (int i = 0; i < parts.length; i++) {
row.set(columnNames[i], parts[i]);
}
c.output(row);
}
}
}
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://dataflow-samples/myfile.csv"))
.apply(ParDo.of(new StringToRowConverter()))
.apply(BigQueryIO.Write.to("myTable")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
PipelineResult result = p.run();
}
}
#1
6
To avoid the creation of the table, you should use the BigQueryIO.Write.CreateDisposition.CREATE_NEVER of the BigQueryIO.Write during the pipeline configuration. Source: https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write
为避免创建表,应在管道配置期间使用BigQueryIO.Write的BigQueryIO.Write.CreateDisposition.CREATE_NEVER。资料来源:https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write
You don't need to know a BigQuery table schema upfront, you can discover it dynamically. For instance, you can use the BigQuery API (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get) to query a table schema and pass it as a parameter for class StringToRowConverter. Another option and assuming that first row is a header, is to skip the first row and use it to map the rest of the file correctly.
您不需要事先了解BigQuery表模式,您可以动态地发现它。例如,您可以使用BigQuery API(https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get)查询表架构并将其作为StringToRowConverter类的参数传递。另一个选项并假设第一行是标题,是跳过第一行并使用它来正确映射文件的其余部分。
The code below implements the 2nd approach and also configures the output to append to an existing BigQuery table.
下面的代码实现了第二种方法,并将输出配置为附加到现有的BigQuery表。
public class DFJob {
public static class StringToRowConverter extends DoFn<String, TableRow> {
private String[] columnNames;
private boolean isFirstRow = true;
public void processElement(ProcessContext c) {
TableRow row = new TableRow();
String[] parts = c.element().split(",");
if (isFirstRow) {
columnNames = Arrays.copyOf(parts, parts.length);
isFirstRow = false;
} else {
for (int i = 0; i < parts.length; i++) {
row.set(columnNames[i], parts[i]);
}
c.output(row);
}
}
}
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://dataflow-samples/myfile.csv"))
.apply(ParDo.of(new StringToRowConverter()))
.apply(BigQueryIO.Write.to("myTable")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
PipelineResult result = p.run();
}
}