I have created a dataflow which takes input from datastore and performs transform to convert it to BigQuery TableRow. I am attaching timestamp with each element in a transform. Then window of one day is applied to the PCollection. The windowed output is written to a partition in BigQuery table using Apache Beam's BigQueryIO.
我创建了一个数据流,它从数据存储区获取输入并执行转换以将其转换为BigQuery TableRow。我正在为变换中的每个元素附加时间戳。然后将一天的窗口应用于PCollection。使用Apache Beam的BigQueryIO将窗口输出写入BigQuery表中的分区。
Before writing to BigQuery, it uses reshuffle via random key as an intermediate step to avoid fusion.
在写入BigQuery之前,它使用随机密钥重新洗牌作为避免融合的中间步骤。
The pipeline behaviour is :
管道行为是:
1. For 2.8 million entities in the input: Total vCPU time- 5.148 vCPU hr Time to complete job- 53 min 9 sec Current workers- 27 Target workers- 27 Job ID: 2018-04-04_04_20_34-1951473901769814139 2. For 7 million entites in the input: Total vCPU time- 247.772 vCPU hr Time to complete the job- 3 hr 45 min Current workers- 69 Target workers- 1000 Job ID: 2018-04-02_21_59_47-8636729278179820259
I couldn't understand why it takes so much time to finish the job and CPU hours for the second case.
我无法理解为什么需要这么多时间来完成第二种情况的工作和CPU时间。
The dataflow pipeline at a high level is :
高级别的数据流管道是:
// Read from datastore
PCollection<Entity> entities =
pipeline.apply("ReadFromDatastore",
DatastoreIO.v1().read().withProjectId(options.getProject())
.withQuery(query).withNamespace(options.getNamespace()));
// Apply processing to convert it to BigQuery TableRow
PCollection<TableRow> tableRow =
entities.apply("ConvertToTableRow", ParDo.of(new ProcessEntityFn()));
// Apply timestamp to TableRow element, and then apply windowing of one day on that
PCollection<TableRow> tableRowWindowTemp =
tableRow.apply("tableAddTimestamp", ParDo.of(new ApplyTimestampFn())).apply(
"tableApplyWindow",
Window.<TableRow> into(CalendarWindows.days(1).withTimeZone(
DateTimeZone.forID(options.getTimeZone()))));
//Apply reshuffle with random key for avoiding fusion
PCollection<TableRow> ismTableRowWindow =
tableRowWindow.apply("ReshuffleViaRandomKey",
Reshuffle.<TableRow> viaRandomKey());
// Write windowed output to BigQuery partitions
tableRowWindow.apply(
"WriteTableToBQ",
BigQueryIO
.writeTableRows()
.withSchema(BigqueryHelper.getSchema())
.to(TableRefPartition.perDay(options.getProject(),
options.getBigQueryDataset(), options.getTableName()))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
1 个解决方案
#1
0
I saw you posted a similar question here, and now you have added to your code the step:
我看到你在这里发布了一个类似的问题,现在你已经在代码中添加了一个步骤:
//Apply reshuffle with random key for avoiding fusion
...
As someone already told you in the other question:
正如有人已经在另一个问题中告诉你的那样:
"The OOM might be symptomatic of a hot key"
“OOM可能是热键的症状”
So in this case it looks like it's still happening something similar(you have further information about Hot Key problems here:
所以在这种情况下看起来它仍然发生类似的事情(你在这里有关于热键问题的更多信息:
If this is the case and there is some worker stuck, then amounts of entities vs. time to complete the job doesn't have to follow any linearity. And the vCPU consumption should be more a matter of optimizing the code to avoid the hot key issue.
如果是这种情况并且有一些工人被困,那么实体的数量与完成工作的时间不必遵循任何线性。而vCPU的消耗应该更多地是优化代码以避免热键问题。
#1
0
I saw you posted a similar question here, and now you have added to your code the step:
我看到你在这里发布了一个类似的问题,现在你已经在代码中添加了一个步骤:
//Apply reshuffle with random key for avoiding fusion
...
As someone already told you in the other question:
正如有人已经在另一个问题中告诉你的那样:
"The OOM might be symptomatic of a hot key"
“OOM可能是热键的症状”
So in this case it looks like it's still happening something similar(you have further information about Hot Key problems here:
所以在这种情况下看起来它仍然发生类似的事情(你在这里有关于热键问题的更多信息:
If this is the case and there is some worker stuck, then amounts of entities vs. time to complete the job doesn't have to follow any linearity. And the vCPU consumption should be more a matter of optimizing the code to avoid the hot key issue.
如果是这种情况并且有一些工人被困,那么实体的数量与完成工作的时间不必遵循任何线性。而vCPU的消耗应该更多地是优化代码以避免热键问题。