Google Dataflow:如何使用FileIO.ReadableFile中的有效JSON数组解析大文件

时间:2021-07-19 15:35:28

In my pipeline FileIO.readMatches() transform reads big JSON file(around 300-400MB) with a valid JSON array and returns FileIO.ReadableFile object to the next transform. My task is to read each JSON object from that JSON array, add new properties and output to the next transform.


At the moment my code to parse the JSON file looks like this:


        // file is a FileIO.ReadableFile object 
        InputStream bis = new ByteArrayInputStream(file.readFullyAsBytes());
        // Im using gson library to parse JSON
        JsonReader reader = new JsonReader(new InputStreamReader(bis, "UTF-8"));
        JsonParser jsonParser = new JsonParser();
        while (reader.hasNext()) {
            JsonObject jsonObject = jsonParser.parse(reader).getAsJsonObject();
            jsonObject.addProperty("Somename", "Somedata");
            // processContext is a ProcessContext object

In this case the whole content of the file will be in my memory which brings options to get java.lang.OutOfMemoryError. Im searching for solution to read one by one all JSON objects without keeping the whole file in my memory. Possible solution is to use method open() from object FileIO.ReadableFile which returns ReadableByteChannel channel but Im not sure how to use that channel to read specifically one JSON object from that channel.


Updated solution This is my updated solution which reads the file line by line


    ReadableByteChannel readableByteChannel = null;
    InputStream inputStream = null;
    BufferedReader bufferedReader = null;
    try {
        // file is a FileIO.ReadableFile 
        readableByteChannel =;
        inputStream = Channels.newInputStream(readableByteChannel);
        bufferedReader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            if (line.length() > 1) {
                // my final output should contain both filename and line
                processContext.output(fileName + file);
    } catch (IOException ex) {
        logger.error("Exception during reading the file: {}", ex);
    } finally {

I see that this solution doesnt work with Dataflow running on n1-standard-1 machine and throws java.lang.OutOfMemoryError: GC overhead limit exceeded exception and works correctly on n1-standard-2 machine.


1 个解决方案



ReadableByteChannel is a java NIO API, introduced in Java 7. Java provides a way to convert it to an InputStream: InputStream bis = Channels.newInputStream(; - I believe this is the only change you need to make.

ReadableByteChannel是Java NIO API,在Java 7中引入.Java提供了一种将其转换为InputStream的方法:InputStream bis = Channels.newInputStream(; - 我相信这是你需要做的唯一改变。



ReadableByteChannel is a java NIO API, introduced in Java 7. Java provides a way to convert it to an InputStream: InputStream bis = Channels.newInputStream(; - I believe this is the only change you need to make.

ReadableByteChannel是Java NIO API,在Java 7中引入.Java提供了一种将其转换为InputStream的方法:InputStream bis = Channels.newInputStream(; - 我相信这是你需要做的唯一改变。