I have a problem with retrieving a data from bigquery table inside DoFn. I can't find example to extract values from TypedRead.
我在从DoFn中的bigquery表中检索数据时遇到问题。我找不到从TypedRead中提取值的示例。
This is a simplified pipeline. I would like to check does record with target SSN exists or not in bigquery table. The target SSN will be received via pubsub in real pipeline, I have replaced it with array of strings.
这是一个简化的管道。我想在bigquery表中查看目标SSN是否存在的记录。目标SSN将通过实际管道中的pubsub接收,我已经用字符串数组替换它。
final BigQueryIoTestOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryIoTestOptions.class);
final List<String> SSNs = Arrays.asList("775-89-3939");
Pipeline p = Pipeline.create(options);
PCollection<String> ssnCollection = p.apply("GetSSNParams", Create.of(SSNs)).setCoder(StringUtf8Coder.of());
ssnCollection.apply("SelectFromBQ", ParDo.of(new DoFn<String, TypedRead<TableRow>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TypedRead<TableRow> tr =
BigQueryIO.readTableRows()
.fromQuery("SELECT pid19PatientSSN FROM dataset.table where pid19PatientSSN = '" + c.element() + "' LIMIT 1");
c.output(tr);
}
}))
.apply("ParseResponseFromBigQuery", ParDo.of(new DoFn<TypedRead<TableRow>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
System.out.println(c.element().toString());
}
}));
p.run();
1 个解决方案
#1
0
Big query returns PCollection only, we can get the result as entry set like the below example or we can serialize to objects as well like mentioned here
大查询只返回PCollection,我们可以像下面的例子那样得到结果作为条目集,或者我们可以像这里提到的那样序列化到对象
If you want to query from BigQuery middle of your pipeline use BigQuery
instead of BigQueryIO
like mentioned here
如果你想从管道中的BigQuery查询,请使用BigQuery而不是像这里提到的BigQueryIO
BigQueryIO Example:
BigQueryIO示例:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> result = pipeline.apply(BigQueryIO.readTableRows()
.fromQuery("SELECT id, name FROM [project-test:test_data.test] LIMIT 1"));
result.apply(MapElements.via(new SimpleFunction<TableRow, Void>() {
@Override
public Void apply(TableRow obj) {
System.out.println("***" + obj);
obj.entrySet().forEach(
(k)-> {
System.out.println(k.getKey() + " :" + k.getValue());
}
);
return null;
}
}));
pipeline.run().waitUntilFinish();
BigQuery Example:
BigQuery示例:
// [START bigquery_simple_app_client]
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// [END bigquery_simple_app_client]
// [START bigquery_simple_app_query]
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"SELECT "
+ "CONCAT('https://*.com/questions/', CAST(id as STRING)) as url, "
+ "view_count "
+ "FROM `bigquery-public-data.*.posts_questions` "
+ "WHERE tags like '%google-bigquery%' "
+ "ORDER BY favorite_count DESC LIMIT 10")
// Use standard SQL syntax for queries.
// See: https://cloud.google.com/bigquery/sql-reference/
.setUseLegacySql(false)
.build();
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Wait for the query to complete.
queryJob = queryJob.waitFor();
// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// [END bigquery_simple_app_query]
// [START bigquery_simple_app_print]
// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId);
TableResult result = queryJob.getQueryResults();
// Print all pages of the results.
for (FieldValueList row : result.iterateAll()) {
String url = row.get("url").getStringValue();
long viewCount = row.get("view_count").getLongValue();
System.out.printf("url: %s views: %d%n", url, viewCount);
}
// [END bigquery_simple_app_print]
#1
0
Big query returns PCollection only, we can get the result as entry set like the below example or we can serialize to objects as well like mentioned here
大查询只返回PCollection,我们可以像下面的例子那样得到结果作为条目集,或者我们可以像这里提到的那样序列化到对象
If you want to query from BigQuery middle of your pipeline use BigQuery
instead of BigQueryIO
like mentioned here
如果你想从管道中的BigQuery查询,请使用BigQuery而不是像这里提到的BigQueryIO
BigQueryIO Example:
BigQueryIO示例:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> result = pipeline.apply(BigQueryIO.readTableRows()
.fromQuery("SELECT id, name FROM [project-test:test_data.test] LIMIT 1"));
result.apply(MapElements.via(new SimpleFunction<TableRow, Void>() {
@Override
public Void apply(TableRow obj) {
System.out.println("***" + obj);
obj.entrySet().forEach(
(k)-> {
System.out.println(k.getKey() + " :" + k.getValue());
}
);
return null;
}
}));
pipeline.run().waitUntilFinish();
BigQuery Example:
BigQuery示例:
// [START bigquery_simple_app_client]
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// [END bigquery_simple_app_client]
// [START bigquery_simple_app_query]
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"SELECT "
+ "CONCAT('https://*.com/questions/', CAST(id as STRING)) as url, "
+ "view_count "
+ "FROM `bigquery-public-data.*.posts_questions` "
+ "WHERE tags like '%google-bigquery%' "
+ "ORDER BY favorite_count DESC LIMIT 10")
// Use standard SQL syntax for queries.
// See: https://cloud.google.com/bigquery/sql-reference/
.setUseLegacySql(false)
.build();
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Wait for the query to complete.
queryJob = queryJob.waitFor();
// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// [END bigquery_simple_app_query]
// [START bigquery_simple_app_print]
// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId);
TableResult result = queryJob.getQueryResults();
// Print all pages of the results.
for (FieldValueList row : result.iterateAll()) {
String url = row.get("url").getStringValue();
long viewCount = row.get("view_count").getLongValue();
System.out.printf("url: %s views: %d%n", url, viewCount);
}
// [END bigquery_simple_app_print]