为什么使用Dataflow EXTREMELY写入Bigquery很慢?

时间:2022-04-24 19:15:43

I can stream inserts directly into BigQuery at a speed of about 10,000 inserts per second but when I try to insert using Dataflow the 'ToBqRow' step (given below) is EXTREMELY slow. Barely 50 rows per 10 minutes and this is with 4 Workers. Any idea why? Here's the relevant code:


PCollection<Status> statuses = p
        .apply("GetTweets", PubsubIO.readStrings().fromTopic(topic))
        .apply("ExtractData", ParDo.of(new DoFn<String, Status>() {
    public void processElement(DoFn<String, Status>.ProcessContext c) throws Exception {
            String rowJson = c.element();

            try {
                TweetsWriter.LOGGER.debug("ROWJSON = " + rowJson);
                Status status = TwitterObjectFactory.createStatus(rowJson);
                if (status == null) {
                    TweetsWriter.LOGGER.error("Status is null");
                } else {
                    TweetsWriter.LOGGER.debug("Status value: " + status.getText());
                TweetsWriter.LOGGER.debug("Status: " + status.getId());
            } catch (Exception var4) {
                TweetsWriter.LOGGER.error("Status creation from JSON failed: " + var4.getMessage());


        .apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = new TableRow();
                Status status = c.element();
                row.set("Id", status.getId());
                row.set("Text", status.getText());
                row.set("RetweetCount", status.getRetweetCount());
                row.set("FavoriteCount", status.getFavoriteCount());
                row.set("Language", status.getLang());
                row.set("ReceivedAt", (Object)null);
                row.set("UserId", status.getUser().getId());
                row.set("CountryCode", status.getPlace().getCountryCode());
                row.set("Country", status.getPlace().getCountry());
        .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)


1 个解决方案



Turns out Bigquery under Dataflow is NOT slow. Problem was, 'status.getPlace().getCountryCode()' was returning NULL so it was throwing NullPointerException that I couldn't see anywhere in the log! Clearly, Dataflow logging needs to improve. It's running really well now. As soon as message comes in the topic, almost instantaneously it gets written to BigQuery!




Turns out Bigquery under Dataflow is NOT slow. Problem was, 'status.getPlace().getCountryCode()' was returning NULL so it was throwing NullPointerException that I couldn't see anywhere in the log! Clearly, Dataflow logging needs to improve. It's running really well now. As soon as message comes in the topic, almost instantaneously it gets written to BigQuery!
