设置管道google数据流的优先级

时间:2022-02-04 15:36:36

I'm new in google dataflow. I have 2 dataflow pipeline to execute 2 difference job. One is ETL process and load to Bigquery and another one is read from Bigquery to aggregate for report. I want to run pipeline ETL firt and after it complete the reports pipeline will run to make sure data in bigquery is latest update.

我是谷歌数据流的新手。我有2个数据流管道来执行2个差异作业。一个是ETL过程并加载到Bigquery,另一个是从Bigquery读取以汇总报告。我想运行管道ETL firt,在完成报告后,将运行管道以确保bigquery中的数据是最新更新。

I had tried to run in one pipe line but it can't help. Now I have to run manual for ETL first and then I run report pipeline.

我试过在一条管道上跑,但它无能为力。现在我必须首先运行ETL手册,然后运行报告管道。

Can any body give me some advice to run 2 job in one pipeline. Thanks.

任何机构都可以给我一些建议,在一个管道中运行2个工作。谢谢。

1 个解决方案

#1


1  

You should be able to do both of these in a single pipeline. Rather than writing to BigQuery and then trying to read that back in and generate the report, consider just using the intermediate data for both purposes. For example:

您应该能够在单个管道中执行这两个操作。而不是写入BigQuery然后尝试重新读取并生成报告,考虑仅将中间数据用于这两个目的。例如:

PCollection<Input> input = /* ... */;
// Perform your transformation logic
PCollection<Intermediate> intermediate = input
  .apply(...)
  .apply(...);
// Convert the transformed results into table rows and
// write those to BigQuery.
intermediate
  .apply(ParDo.of(new IntermediateToTableRowETL())
  .apply(BigQueryIO.write(...));
// Generate your report over the transformed data
intermediate
  .apply(...)
  .apply(...);

#1


1  

You should be able to do both of these in a single pipeline. Rather than writing to BigQuery and then trying to read that back in and generate the report, consider just using the intermediate data for both purposes. For example:

您应该能够在单个管道中执行这两个操作。而不是写入BigQuery然后尝试重新读取并生成报告,考虑仅将中间数据用于这两个目的。例如:

PCollection<Input> input = /* ... */;
// Perform your transformation logic
PCollection<Intermediate> intermediate = input
  .apply(...)
  .apply(...);
// Convert the transformed results into table rows and
// write those to BigQuery.
intermediate
  .apply(ParDo.of(new IntermediateToTableRowETL())
  .apply(BigQueryIO.write(...));
// Generate your report over the transformed data
intermediate
  .apply(...)
  .apply(...);