TableRow.get上的Google Cloud Dataflow,BigQueryIO和NullPointerException

时间:2021-08-30 18:04:55

I'm new to GC Dataflow and didn't find a relevant answer here. Apologies if I should have found this already answered.

我是GC Dataflow的新手,在这里找不到相关的答案。如果我应该已经发现这已经回答了,请道歉。

I'm trying to create a simple pipeline using the v2.0 SDK and am having trouble reading data into my PCollection using BigQueryIO. I am using the .withQuery method and I have tested the query in the BigQuery interface and it seems to be working fine. The initial PCollection seems to get created without any issues, but when I think setup a simple ParDo function to convert the values from the TableRow into a PCollection I am getting a NullPointerException on the line of code that does the .get on the TableRow object.

我正在尝试使用v2.0 SDK创建一个简单的管道,并且无法使用BigQueryIO将数据读入我的PCollection。我正在使用.withQuery方法,我已经在BigQuery界面中测试了查询,它似乎工作正常。最初的PCollection似乎没有任何问题,但是当我想设置一个简单的ParDo函数将TableRow中的值转换为PCollection时,我会在TableRow对象上执行.get的代码行上获得NullPointerException。

Here is my code. (I'm probably missing something simple. I'm a total newbie at Pipeline programming. Any input would be most appreciated.)

这是我的代码。 (我可能错过了一些简单的东西。我是Pipeline编程的新手。任何输入都会非常感激。)

public class ClientAutocompletePipeline {
    private static final Logger LOG = LoggerFactory.getLogger(ClientAutocompletePipeline.class);


    public static void main(String[] args) {
        //  create the pipeline  
        Pipeline p = Pipeline.create(
                PipelineOptionsFactory.fromArgs(args).withValidation().create());

        // A step to read in the product names from a BigQuery table
        p.apply(BigQueryIO.read().fromQuery("SELECT name FROM [beaming-team-169321:Products.raw_product_data]"))

        .apply("ExtractProductNames", ParDo.of(new DoFn<TableRow, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                // Grab a row from the BigQuery Results
                TableRow row = c.element();

                // Get the value of the "name" column from the table row.
                //NOTE: This is the line that is giving me the NullPointerException 
                String productName = row.get("name").toString();

                // Make sure it isn't empty
                if (!productName.isEmpty()) {
                    c.output(productName);
                }
            }
        }))

The query definitely works in the BigQuery UI and the column called "name" is returned when I test the query. Why am I getting a NullPointerException on this line:

查询肯定在BigQuery UI中工作,并且在测试查询时返回名为“name”的列。为什么我在这一行上得到NullPointerException:

String productName = row.get("name").toString();

Any ideas?

有任何想法吗?

2 个解决方案

#1


0  

This is a common problem when working with BigQuery and Dataflow (most likely the field is indeed null). If you are ok with using Scala, you could take a look at Scio (which is a Scala DSL for Dataflow) and its BigQuery IO.

在使用BigQuery和Dataflow时,这是一个常见问题(很可能该字段确实为null)。如果你可以使用Scala,你可以看看Scio(这是一个Scala DSL for Dataflow)及其BigQuery IO。

#2


0  

Just make your code null safe. Replace this:

只需使您的代码保持安全。替换这个:

String productName = row.get("name").toString();

String productName = row.get(“name”)。toString();

With something like this:

有这样的事情:

String productName = String.valueOf(row.get("name"));

String productName = String.valueOf(row.get(“name”));

#1


0  

This is a common problem when working with BigQuery and Dataflow (most likely the field is indeed null). If you are ok with using Scala, you could take a look at Scio (which is a Scala DSL for Dataflow) and its BigQuery IO.

在使用BigQuery和Dataflow时,这是一个常见问题(很可能该字段确实为null)。如果你可以使用Scala,你可以看看Scio(这是一个Scala DSL for Dataflow)及其BigQuery IO。

#2


0  

Just make your code null safe. Replace this:

只需使您的代码保持安全。替换这个:

String productName = row.get("name").toString();

String productName = row.get(“name”)。toString();

With something like this:

有这样的事情:

String productName = String.valueOf(row.get("name"));

String productName = String.valueOf(row.get(“name”));