I have a job that among other things also inserts some of the data it reads from files into BigQuery table for later manual analysis.
我有一项工作,除其他事项外,还将从文件中读取的一些数据插入到BigQuery表中,以便以后进行手动分析。
It fails with the following error:
它失败并出现以下错误:
job error: Too many sources provided: 10001. Limit is 10000., error: Too many sources provided: 10001. Limit is 10000.
What does it refer to as "source"? Is it a file or a pipeline step?
它被称为“来源”是什么?它是文件还是管道步骤?
Thanks, G
谢谢,G
3 个解决方案
#1
2
I'm guessing the error is coming from BigQuery and means that we are trying to upload too many files when we create your output table.
我猜这个错误来自BigQuery,这意味着我们在创建输出表时尝试上传太多文件。
Could you provide some more details on the error / context (like a snippet of the commandline output (if using the BlockingDataflowPipelineRunner) so I can confirm? A jobId would also be helpful.
你能提供一些关于错误/上下文的更多细节(比如命令行输出的片段(如果使用BlockingDataflowPipelineRunner),那么我可以确认吗?jobId也会有所帮助。
Is there something about your pipeline structure that is going to result in a large number of output files? That could either be a large amount of data or perhaps finely sharded input files without a subsequent GroupByKey operation (which would let us reshard the data into larger pieces).
你的管道结构有什么东西会导致大量的输出文件?这可能是大量数据,也可能是精细分片的输入文件而没有后续的GroupByKey操作(这会让我们将数据重新分成更大的部分)。
#2
1
The note in In Google Cloud Dataflow BigQueryIO.Write occur Unknown Error (http code 500) mitigates this issue:
In Google Cloud Dataflow BigQueryIO.Write中的注释发生未知错误(http代码500)缓解此问题:
Dataflow SDK for Java 1.x: as a workaround, you can enable this experiment in : --experiments=enable_custom_bigquery_sink
Dataflow SDK for Java 1.x:作为一种变通方法,您可以在以下位置启用此实验: - instperiments = enable_custom_bigquery_sink
In Dataflow SDK for Java 2.x, this behavior is default and no experiments are necessary.
在Dataflow SDK for Java 2.x中,此行为是默认行为,不需要进行任何实验。
Note that in both versions, temporary files in GCS may be left over if your job fails.
请注意,在两个版本中,如果作业失败,GCS中的临时文件可能会遗留下来。
#3
0
public static class ForceGroupBy <T> extends PTransform<PCollection<T>, PCollection<KV<T, Iterable<Void>>>> {
private static final long serialVersionUID = 1L;
@Override
public PCollection<KV<T, Iterable<Void>>> apply(PCollection<T> input) {
PCollection<KV<T,Void>> syntheticGroup = input.apply(
ParDo.of(new DoFn<T,KV<T,Void>>(){
private static final long serialVersionUID = 1L;
@Override
public void processElement(
DoFn<T, KV<T, Void>>.ProcessContext c)
throws Exception {
c.output(KV.of(c.element(),(Void)null));
} }));
return syntheticGroup.apply(GroupByKey.<T,Void>create());
}
}
#1
2
I'm guessing the error is coming from BigQuery and means that we are trying to upload too many files when we create your output table.
我猜这个错误来自BigQuery,这意味着我们在创建输出表时尝试上传太多文件。
Could you provide some more details on the error / context (like a snippet of the commandline output (if using the BlockingDataflowPipelineRunner) so I can confirm? A jobId would also be helpful.
你能提供一些关于错误/上下文的更多细节(比如命令行输出的片段(如果使用BlockingDataflowPipelineRunner),那么我可以确认吗?jobId也会有所帮助。
Is there something about your pipeline structure that is going to result in a large number of output files? That could either be a large amount of data or perhaps finely sharded input files without a subsequent GroupByKey operation (which would let us reshard the data into larger pieces).
你的管道结构有什么东西会导致大量的输出文件?这可能是大量数据,也可能是精细分片的输入文件而没有后续的GroupByKey操作(这会让我们将数据重新分成更大的部分)。
#2
1
The note in In Google Cloud Dataflow BigQueryIO.Write occur Unknown Error (http code 500) mitigates this issue:
In Google Cloud Dataflow BigQueryIO.Write中的注释发生未知错误(http代码500)缓解此问题:
Dataflow SDK for Java 1.x: as a workaround, you can enable this experiment in : --experiments=enable_custom_bigquery_sink
Dataflow SDK for Java 1.x:作为一种变通方法,您可以在以下位置启用此实验: - instperiments = enable_custom_bigquery_sink
In Dataflow SDK for Java 2.x, this behavior is default and no experiments are necessary.
在Dataflow SDK for Java 2.x中,此行为是默认行为,不需要进行任何实验。
Note that in both versions, temporary files in GCS may be left over if your job fails.
请注意,在两个版本中,如果作业失败,GCS中的临时文件可能会遗留下来。
#3
0
public static class ForceGroupBy <T> extends PTransform<PCollection<T>, PCollection<KV<T, Iterable<Void>>>> {
private static final long serialVersionUID = 1L;
@Override
public PCollection<KV<T, Iterable<Void>>> apply(PCollection<T> input) {
PCollection<KV<T,Void>> syntheticGroup = input.apply(
ParDo.of(new DoFn<T,KV<T,Void>>(){
private static final long serialVersionUID = 1L;
@Override
public void processElement(
DoFn<T, KV<T, Void>>.ProcessContext c)
throws Exception {
c.output(KV.of(c.element(),(Void)null));
} }));
return syntheticGroup.apply(GroupByKey.<T,Void>create());
}
}