Google Dataflow:如何使用FileIO.ReadableFile中的有效JSON数组解析大文件

时间:2021-07-19 15:35:28


In my pipeline FileIO.readMatches() transform reads big JSON file(around 300-400MB) with a valid JSON array and returns FileIO.ReadableFile object to the next transform. My task is to read each JSON object from that JSON array, add new properties and output to the next transform.

在我的管道中,FileIO.readMatches()转换使用有效的JSON数组读取大JSON文件(大约300-400MB),并将FileIO.ReadableFile对象返回到下一个转换。我的任务是从该JSON数组中读取每个JSON对象,添加新属性并输出到下一个转换。

At the moment my code to parse the JSON file looks like this:

目前,我解析JSON文件的代码如下所示:

        // file is a FileIO.ReadableFile object 
        InputStream bis = new ByteArrayInputStream(file.readFullyAsBytes());
        // Im using gson library to parse JSON
        JsonReader reader = new JsonReader(new InputStreamReader(bis, "UTF-8"));
        JsonParser jsonParser = new JsonParser();
        reader.beginArray();
        while (reader.hasNext()) {
            JsonObject jsonObject = jsonParser.parse(reader).getAsJsonObject();
            jsonObject.addProperty("Somename", "Somedata");
            // processContext is a ProcessContext object
            processContext.output(jsonObject.toString());
        }
        reader.close();

In this case the whole content of the file will be in my memory which brings options to get java.lang.OutOfMemoryError. Im searching for solution to read one by one all JSON objects without keeping the whole file in my memory. Possible solution is to use method open() from object FileIO.ReadableFile which returns ReadableByteChannel channel but Im not sure how to use that channel to read specifically one JSON object from that channel.

在这种情况下,文件的整个内容将在我的内存中,它带来了获取java.lang.OutOfMemoryError的选项。我正在寻找解决方案,逐个读取所有JSON对象,而不将整个文件保存在我的内存中。可能的解决方案是使用来自对象FileIO.ReadableFile的方法open(),它返回ReadableByteChannel通道,但我不知道如何使用该通道从该通道专门读取一个JSON对象。

Updated solution This is my updated solution which reads the file line by line

更新的解决方案这是我更新的解决方案,它逐行读取文件

    ReadableByteChannel readableByteChannel = null;
    InputStream inputStream = null;
    BufferedReader bufferedReader = null;
    try {
        // file is a FileIO.ReadableFile 
        readableByteChannel = file.open();
        inputStream = Channels.newInputStream(readableByteChannel);
        bufferedReader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            if (line.length() > 1) {
                // my final output should contain both filename and line
                processContext.output(fileName + file);
            }
        }
    } catch (IOException ex) {
        logger.error("Exception during reading the file: {}", ex);
    } finally {
        IOUtils.closeQuietly(bufferedReader);
        IOUtils.closeQuietly(inputStream);
    }

I see that this solution doesnt work with Dataflow running on n1-standard-1 machine and throws java.lang.OutOfMemoryError: GC overhead limit exceeded exception and works correctly on n1-standard-2 machine.

我看到这个解决方案不适用于在n1-standard-1机器上运行的Dataflow并抛出java.lang.OutOfMemoryError:GC开销限制超出异常并且在n1-standard-2机器上正常工作。

1 个解决方案

#1


0  

ReadableByteChannel is a java NIO API, introduced in Java 7. Java provides a way to convert it to an InputStream: InputStream bis = Channels.newInputStream(file.open()); - I believe this is the only change you need to make.

ReadableByteChannel是Java NIO API,在Java 7中引入.Java提供了一种将其转换为InputStream的方法:InputStream bis = Channels.newInputStream(file.open()); - 我相信这是你需要做的唯一改变。

#1


0  

ReadableByteChannel is a java NIO API, introduced in Java 7. Java provides a way to convert it to an InputStream: InputStream bis = Channels.newInputStream(file.open()); - I believe this is the only change you need to make.

ReadableByteChannel是Java NIO API,在Java 7中引入.Java提供了一种将其转换为InputStream的方法:InputStream bis = Channels.newInputStream(file.open()); - 我相信这是你需要做的唯一改变。