I wanted to take advantage of the new BigQuery functionality of time partitioned tables, but am unsure this is currently possible in the 1.6 version of the Dataflow SDK.
我想利用时间分区表的新BigQuery功能,但我不确定这在1.6版本的Dataflow SDK中是否可行。
Looking at the BigQuery JSON API, to create a day partitioned table one needs to pass in a
查看BigQuery JSON API,要创建一个分区表,需要传入一个
"timePartitioning": { "type": "DAY" }
option, but the com.google.cloud.dataflow.sdk.io.BigQueryIO interface only allows specifying a TableReference.
选项,但com.google.cloud.dataflow.sdk.io.BigQueryIO接口仅允许指定TableReference。
I thought that maybe I could pre-create the table, and sneak in a partition decorator via a BigQueryIO.Write.toTableReference lambda..? Is anyone else having success with creating/writing partitioned tables via Dataflow?
我想也许我可以预先创建表,并通过BigQueryIO.Write.toTableReference lambda潜入分区装饰器..?是否有其他人通过Dataflow创建/编写分区表成功?
This seems like a similar issue to setting the table expiration time which isn't currently available either.
这似乎与设置当前不可用的表过期时间类似。
6 个解决方案
#1
6
As Pavan says, it is definitely possible to write to partition tables with Dataflow. Are you using the DataflowPipelineRunner
operating in streaming mode or batch mode?
正如Pavan所说,绝对可以使用Dataflow写入分区表。您使用的是在流模式或批处理模式下运行的DataflowPipelineRunner吗?
The solution you proposed should work. Specifically, if you pre-create a table with date partitioning set up, then you can use a BigQueryIO.Write.toTableReference
lambda to write to a date partition. For example:
你提出的解决方案应该有效。具体来说,如果您预先创建一个设置了日期分区的表,那么您可以使用BigQueryIO.Write.toTableReference lambda来写入日期分区。例如:
/**
* A Joda-time formatter that prints a date in format like {@code "20160101"}.
* Threadsafe.
*/
private static final DateTimeFormatter FORMATTER =
DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);
// This code generates a valid BigQuery partition name:
Instant instant = Instant.now(); // any Joda instant in a reasonable time range
String baseTableName = "project:dataset.table"; // a valid BigQuery table name
String partitionName =
String.format("%s$%s", baseTableName, FORMATTER.print(instant));
#2
6
The approach I took (works in the streaming mode, too):
我采用的方法(也在流模式下工作):
- Define a custom window for the incoming record
- 为传入记录定义自定义窗口
-
Convert the window into the table/partition name
将窗口转换为表/分区名称
p.apply(PubsubIO.Read .subscription(subscription) .withCoder(TableRowJsonCoder.of()) ) .apply(Window.into(new TablePartitionWindowFn()) ) .apply(BigQueryIO.Write .to(new DayPartitionFunc(dataset, table)) .withSchema(schema) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) );
Setting the window based on the incoming data, the End Instant can be ignored, as the start value is used for setting the partition:
根据传入数据设置窗口,可以忽略End Instant,因为起始值用于设置分区:
public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
private IntervalWindow assignWindow(AssignContext context) {
TableRow source = (TableRow) context.element();
String dttm_str = (String) source.get("DTTM");
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();
Instant start_point = Instant.parse(dttm_str,formatter);
Instant end_point = start_point.withDurationAdded(1000, 1);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public IntervalWindow getSideInputWindow(BoundedWindow window) {
if (window instanceof GlobalWindow) {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
return null;
}
Setting the table partition dynamically:
动态设置表分区:
public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {
String destination = "";
public DayPartitionFunc(String dataset, String table) {
this.destination = dataset + "." + table+ "$";
}
@Override
public String apply(BoundedWindow boundedWindow) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyyMMdd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) boundedWindow).start());
return destination + dayString;
}}
Is there a better way of achieving the same outcome?
有没有更好的方法来实现相同的结果?
#3
3
I believe it should be possible to use the partition decorator when you are not using streaming. We are actively working on supporting partition decorators through streaming. Please let us know if you are seeing any errors today with non-streaming mode.
我相信当你不使用流媒体时应该可以使用分区装饰器。我们正积极致力于通过流媒体支持分区装饰器。如果您在非流媒体模式下看到任何错误,请告诉我们。
#4
1
Apache Beam version 2.0 supports sharding BigQuery output tables out of the box.
Apache Beam版本2.0支持开箱即用的分片BigQuery输出表。
#5
0
If you pass the table name in table_name_YYYYMMDD
format, then BigQuery will treat it as a sharded table, which can simulate partition table features. Refer the documentation: https://cloud.google.com/bigquery/docs/partitioned-tables
如果以table_name_YYYYMMDD格式传递表名,则BigQuery会将其视为分片表,可以模拟分区表功能。请参阅文档:https://cloud.google.com/bigquery/docs/partitioned-tables
#6
0
I have written data into bigquery partitioned tables through dataflow. These writings are dynamic as-in if the data in that partition already exists then I can either append to it or overwrite it.
我已经通过数据流将数据写入bigquery分区表。如果该分区中的数据已经存在,那么这些着作是动态的,然后我可以附加到它或覆盖它。
I have written the code in Python. It is a batch mode write operation into bigquery.
我用Python编写了代码。它是批量模式写入bigquery的操作。
client = bigquery.Client(project=projectName)
dataset_ref = client.dataset(datasetName)
table_ref = dataset_ref.table(bqTableName)
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = skipLeadingRows
job_config.source_format = bigquery.SourceFormat.CSV
if tableExists(client, table_ref):
job_config.autodetect = autoDetect
previous_rows = client.get_table(table_ref).num_rows
#assert previous_rows > 0
if allowJaggedRows is True:
job_config.allowJaggedRows = True
if allowFieldAddition is True:
job_config._properties['load']['schemaUpdateOptions'] = ['ALLOW_FIELD_ADDITION']
if isPartitioned is True:
job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
if schemaList is not None:
job_config.schema = schemaList
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
else:
job_config.autodetect = autoDetect
job_config._properties['createDisposition'] = 'CREATE_IF_NEEDED'
job_config.schema = schemaList
if isPartitioned is True:
job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
if schemaList is not None:
table = bigquery.Table(table_ref, schema=schemaList)
load_job = client.load_table_from_uri(gcsFileName, table_ref, job_config=job_config)
assert load_job.job_type == 'load'
load_job.result()
assert load_job.state == 'DONE'
It works fine.
它工作正常。
#1
6
As Pavan says, it is definitely possible to write to partition tables with Dataflow. Are you using the DataflowPipelineRunner
operating in streaming mode or batch mode?
正如Pavan所说,绝对可以使用Dataflow写入分区表。您使用的是在流模式或批处理模式下运行的DataflowPipelineRunner吗?
The solution you proposed should work. Specifically, if you pre-create a table with date partitioning set up, then you can use a BigQueryIO.Write.toTableReference
lambda to write to a date partition. For example:
你提出的解决方案应该有效。具体来说,如果您预先创建一个设置了日期分区的表,那么您可以使用BigQueryIO.Write.toTableReference lambda来写入日期分区。例如:
/**
* A Joda-time formatter that prints a date in format like {@code "20160101"}.
* Threadsafe.
*/
private static final DateTimeFormatter FORMATTER =
DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);
// This code generates a valid BigQuery partition name:
Instant instant = Instant.now(); // any Joda instant in a reasonable time range
String baseTableName = "project:dataset.table"; // a valid BigQuery table name
String partitionName =
String.format("%s$%s", baseTableName, FORMATTER.print(instant));
#2
6
The approach I took (works in the streaming mode, too):
我采用的方法(也在流模式下工作):
- Define a custom window for the incoming record
- 为传入记录定义自定义窗口
-
Convert the window into the table/partition name
将窗口转换为表/分区名称
p.apply(PubsubIO.Read .subscription(subscription) .withCoder(TableRowJsonCoder.of()) ) .apply(Window.into(new TablePartitionWindowFn()) ) .apply(BigQueryIO.Write .to(new DayPartitionFunc(dataset, table)) .withSchema(schema) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) );
Setting the window based on the incoming data, the End Instant can be ignored, as the start value is used for setting the partition:
根据传入数据设置窗口,可以忽略End Instant,因为起始值用于设置分区:
public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
private IntervalWindow assignWindow(AssignContext context) {
TableRow source = (TableRow) context.element();
String dttm_str = (String) source.get("DTTM");
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();
Instant start_point = Instant.parse(dttm_str,formatter);
Instant end_point = start_point.withDurationAdded(1000, 1);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public IntervalWindow getSideInputWindow(BoundedWindow window) {
if (window instanceof GlobalWindow) {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
return null;
}
Setting the table partition dynamically:
动态设置表分区:
public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {
String destination = "";
public DayPartitionFunc(String dataset, String table) {
this.destination = dataset + "." + table+ "$";
}
@Override
public String apply(BoundedWindow boundedWindow) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyyMMdd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) boundedWindow).start());
return destination + dayString;
}}
Is there a better way of achieving the same outcome?
有没有更好的方法来实现相同的结果?
#3
3
I believe it should be possible to use the partition decorator when you are not using streaming. We are actively working on supporting partition decorators through streaming. Please let us know if you are seeing any errors today with non-streaming mode.
我相信当你不使用流媒体时应该可以使用分区装饰器。我们正积极致力于通过流媒体支持分区装饰器。如果您在非流媒体模式下看到任何错误,请告诉我们。
#4
1
Apache Beam version 2.0 supports sharding BigQuery output tables out of the box.
Apache Beam版本2.0支持开箱即用的分片BigQuery输出表。
#5
0
If you pass the table name in table_name_YYYYMMDD
format, then BigQuery will treat it as a sharded table, which can simulate partition table features. Refer the documentation: https://cloud.google.com/bigquery/docs/partitioned-tables
如果以table_name_YYYYMMDD格式传递表名,则BigQuery会将其视为分片表,可以模拟分区表功能。请参阅文档:https://cloud.google.com/bigquery/docs/partitioned-tables
#6
0
I have written data into bigquery partitioned tables through dataflow. These writings are dynamic as-in if the data in that partition already exists then I can either append to it or overwrite it.
我已经通过数据流将数据写入bigquery分区表。如果该分区中的数据已经存在,那么这些着作是动态的,然后我可以附加到它或覆盖它。
I have written the code in Python. It is a batch mode write operation into bigquery.
我用Python编写了代码。它是批量模式写入bigquery的操作。
client = bigquery.Client(project=projectName)
dataset_ref = client.dataset(datasetName)
table_ref = dataset_ref.table(bqTableName)
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = skipLeadingRows
job_config.source_format = bigquery.SourceFormat.CSV
if tableExists(client, table_ref):
job_config.autodetect = autoDetect
previous_rows = client.get_table(table_ref).num_rows
#assert previous_rows > 0
if allowJaggedRows is True:
job_config.allowJaggedRows = True
if allowFieldAddition is True:
job_config._properties['load']['schemaUpdateOptions'] = ['ALLOW_FIELD_ADDITION']
if isPartitioned is True:
job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
if schemaList is not None:
job_config.schema = schemaList
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
else:
job_config.autodetect = autoDetect
job_config._properties['createDisposition'] = 'CREATE_IF_NEEDED'
job_config.schema = schemaList
if isPartitioned is True:
job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
if schemaList is not None:
table = bigquery.Table(table_ref, schema=schemaList)
load_job = client.load_table_from_uri(gcsFileName, table_ref, job_config=job_config)
assert load_job.job_type == 'load'
load_job.result()
assert load_job.state == 'DONE'
It works fine.
它工作正常。