I'm playing around with BigQueryIO write using loads. My load trigger is set to 18 hours. I'm ingesting data from Kafka with a fixed daily window.
我正在使用负载来玩BigQueryIO。我的加载触发器设置为18小时。我正在通过固定的每日窗口从Kafka中提取数据。
Based on https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L213-L231 it seems that the intended behavior is to offload rows to the filesystem when at least 500k records are in a pane
基于https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/ bigquery / BatchLoads.java#L213-L231似乎预期的行为是在窗格中至少有500k条记录时将行卸载到文件系统
I managed to produce ~ 600K records and waited for around 2 hours to see if the rows were uploaded to gcs, however, nothing was there. I noticed that the "GroupByDestination" step in "BatchLoads" shows 0 under "Output collections" size.
我设法产生约600K的记录并等待大约2个小时来查看行是否上传到gcs,但是,没有任何东西存在。我注意到“BatchLoads”中的“GroupByDestination”步骤在“输出集合”大小下显示0。
When I use a smaller load trigger all seems fine. Shouldn't the AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
be triggered?
当我使用较小的负载触发器时,一切似乎都很好。不应该触发AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))?
Here is the code for writing to BigQuery
这是写入BigQuery的代码
BigQueryIO
.writeTableRows()
.to(new SerializableFunction[ValueInSingleWindow[TableRow], TableDestination]() {
override def apply(input: ValueInSingleWindow[TableRow]): TableDestination = {
val startWindow = input.getWindow.asInstanceOf[IntervalWindow].start()
val dayPartition = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(startWindow)
new TableDestination("myproject_id:mydataset_id.table$" + dayPartition, null)
}
})
.withMethod(Method.FILE_LOADS)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withSchema(BigQueryUtils.schemaOf[MySchema])
.withTriggeringFrequency(Duration.standardHours(18))
.withNumFileShards(10)
The job id is 2018-02-16_14_34_54-7547662103968451637. Thanks in advance.
工作ID是2018-02-16_14_34_54-7547662103968451637。提前致谢。
1 个解决方案
#1
1
Panes are per key per window, and BigQueryIO.write() with dynamic destinations uses the destination as key under the hood, so the "500k elements in pane" thing applies per destination per window.
窗格是每个窗口的每个键,并且具有动态目标的BigQueryIO.write()使用目标作为引擎盖下的键,因此“窗格中的500k元素”事物适用于每个窗口的每个目标。
#1
1
Panes are per key per window, and BigQueryIO.write() with dynamic destinations uses the destination as key under the hood, so the "500k elements in pane" thing applies per destination per window.
窗格是每个窗口的每个键,并且具有动态目标的BigQueryIO.write()使用目标作为引擎盖下的键,因此“窗格中的500k元素”事物适用于每个窗口的每个目标。