My use case is simple: read event logs from Pub/Sub subscription, parse them and save into BigQuery. Because the number of events is expected to grow significantly and I work with unbounded data source I decided to configure sharding in BigQuery: store events into daily tables based on timestamp from the event data (what is called "event time" in the Beam documentation). The question I have is do I need to configure windowing in my case or I can just leave the default configuration which implicitly uses global window? The reason I'm asking is because most of the examples of BigQuery sharding I found assume usage of windowing configuration. But in my case, since I'm not using any grouping operations as GroupByKey
and Combine
, looks like I should be just fine without any windowing configuration. Or are there any reasons for me to use windowing anyway, maybe it affects how BigQueryIO
performs for example?
我的用例很简单:从Pub / Sub订阅中读取事件日志,解析它们并保存到BigQuery中。因为事件的数量预计会显着增加而且我使用*数据源我决定在BigQuery中配置分片:根据事件数据的时间戳将事件存储到日常表中(Beam文档中称为“事件时间”) 。我的问题是我需要在我的情况下配置窗口,或者我可以保留隐式使用全局窗口的默认配置吗?我问的原因是因为我发现大多数BigQuery分片的例子都假设使用了窗口配置。但在我的情况下,由于我没有使用任何分组操作作为GroupByKey和Combine,看起来我应该没有任何窗口配置就好了。或者我有没有理由使用窗口,也许它会影响BigQueryIO的表现如何?
The way I do sharding now is below.
我现在进行分片的方式如下。
static class TableNamingFn implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> input) {
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);
TableReference reference = new TableReference();
reference.setProjectId("test-project");
reference.setDatasetId("event_log");
DateTime timestamp = new DateTime(input.getValue().get("event_timestamp"), DateTimeZone.UTC);
reference.setTableId("events_" + formatter.print(timestamp));
return new TableDestination(reference, null);
}
}
// And then
eventRows.apply("BigQueryWrite", BigQueryIO.writeTableRows()
.to(new TableNamingFn())
.withSchema(EventSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
1 个解决方案
#1
1
It looks like you are trying to shard the table by date, have you considered using a Date-partitioned Table instead. You could update where you set your table id to using the partition decorator, something like:
看起来您正在尝试按日期对表进行分片,您是否考虑过使用日期分区表。您可以使用分区装饰器更新设置表ID的位置,例如:
reference.setTableId("events$" + formatter.print(timestamp));
This article covers using BigQuery's partitioned tables with Apache Beam. In particular this snippet of code is probably what you want to use: https://gist.githubusercontent.com/alexvanboxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/medium_bq_tableref_partition
本文介绍如何在Apache Beam中使用BigQuery的分区表。特别是这段代码可能就是你想要使用的代码:https://gist.githubusercontent.com/alexvanboxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/medium_bq_tableref_partition
#1
1
It looks like you are trying to shard the table by date, have you considered using a Date-partitioned Table instead. You could update where you set your table id to using the partition decorator, something like:
看起来您正在尝试按日期对表进行分片,您是否考虑过使用日期分区表。您可以使用分区装饰器更新设置表ID的位置,例如:
reference.setTableId("events$" + formatter.print(timestamp));
This article covers using BigQuery's partitioned tables with Apache Beam. In particular this snippet of code is probably what you want to use: https://gist.githubusercontent.com/alexvanboxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/medium_bq_tableref_partition
本文介绍如何在Apache Beam中使用BigQuery的分区表。特别是这段代码可能就是你想要使用的代码:https://gist.githubusercontent.com/alexvanboxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/medium_bq_tableref_partition