为什么使用Dataflow EXTREMELY写入Bigquery很慢?

时间:2022-04-24 19:15:43

I can stream inserts directly into BigQuery at a speed of about 10,000 inserts per second but when I try to insert using Dataflow the 'ToBqRow' step (given below) is EXTREMELY slow. Barely 50 rows per 10 minutes and this is with 4 Workers. Any idea why? Here's the relevant code:

我可以以每秒大约10,000次插入的速度直接将插入流插入BigQuery,但是当我尝试使用Dataflow插入时,“ToBqRow”步骤(下面给出)非常慢。每10分钟只有50行,这是4名工人。知道为什么吗?这是相关的代码:

PCollection<Status> statuses = p
        .apply("GetTweets", PubsubIO.readStrings().fromTopic(topic))
        .apply("ExtractData", ParDo.of(new DoFn<String, Status>() {
    @ProcessElement
    public void processElement(DoFn<String, Status>.ProcessContext c) throws Exception {
            String rowJson = c.element();

            try {
                TweetsWriter.LOGGER.debug("ROWJSON = " + rowJson);
                Status status = TwitterObjectFactory.createStatus(rowJson);
                if (status == null) {
                    TweetsWriter.LOGGER.error("Status is null");
                } else {
                    TweetsWriter.LOGGER.debug("Status value: " + status.getText());
                }
                c.output(status);
                TweetsWriter.LOGGER.debug("Status: " + status.getId());
            } catch (Exception var4) {
                TweetsWriter.LOGGER.error("Status creation from JSON failed: " + var4.getMessage());
            }

    }
}));

statuses
        .apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = new TableRow();
                Status status = c.element();
                row.set("Id", status.getId());
                row.set("Text", status.getText());
                row.set("RetweetCount", status.getRetweetCount());
                row.set("FavoriteCount", status.getFavoriteCount());
                row.set("Language", status.getLang());
                row.set("ReceivedAt", (Object)null);
                row.set("UserId", status.getUser().getId());
                row.set("CountryCode", status.getPlace().getCountryCode());
                row.set("Country", status.getPlace().getCountry());
                c.output(row);
        }
    }))
        .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)
                .withSchema(schema)
                .withMethod(Method.STREAMING_INSERTS)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));

p.run();

1 个解决方案

#1


1  

Turns out Bigquery under Dataflow is NOT slow. Problem was, 'status.getPlace().getCountryCode()' was returning NULL so it was throwing NullPointerException that I couldn't see anywhere in the log! Clearly, Dataflow logging needs to improve. It's running really well now. As soon as message comes in the topic, almost instantaneously it gets written to BigQuery!

原来Dataflow下的Bigquery并不慢。问题是'status.getPlace()。getCountryCode()'返回NULL,所以它抛出了我在日志中看不到的NullPointerException!显然,Dataflow日志记录需要改进。它现在运行得很好。一旦消息出现在主题中,它几乎立即被写入BigQuery!

#1


1  

Turns out Bigquery under Dataflow is NOT slow. Problem was, 'status.getPlace().getCountryCode()' was returning NULL so it was throwing NullPointerException that I couldn't see anywhere in the log! Clearly, Dataflow logging needs to improve. It's running really well now. As soon as message comes in the topic, almost instantaneously it gets written to BigQuery!

原来Dataflow下的Bigquery并不慢。问题是'status.getPlace()。getCountryCode()'返回NULL,所以它抛出了我在日志中看不到的NullPointerException!显然,Dataflow日志记录需要改进。它现在运行得很好。一旦消息出现在主题中,它几乎立即被写入BigQuery!