In my pipeline FileIO.readMatches()
transform reads big JSON file(around 300-400MB) with a valid JSON array and returns FileIO.ReadableFile
object to the next transform. My task is to read each JSON object from that JSON array, add new properties and output to the next transform.
在我的管道中,FileIO.readMatches()转换使用有效的JSON数组读取大JSON文件(大约300-400MB),并将FileIO.ReadableFile对象返回到下一个转换。我的任务是从该JSON数组中读取每个JSON对象,添加新属性并输出到下一个转换。
At the moment my code to parse the JSON file looks like this:
目前,我解析JSON文件的代码如下所示:
// file is a FileIO.ReadableFile object
InputStream bis = new ByteArrayInputStream(file.readFullyAsBytes());
// Im using gson library to parse JSON
JsonReader reader = new JsonReader(new InputStreamReader(bis, "UTF-8"));
JsonParser jsonParser = new JsonParser();
reader.beginArray();
while (reader.hasNext()) {
JsonObject jsonObject = jsonParser.parse(reader).getAsJsonObject();
jsonObject.addProperty("Somename", "Somedata");
// processContext is a ProcessContext object
processContext.output(jsonObject.toString());
}
reader.close();
In this case the whole content of the file will be in my memory which brings options to get java.lang.OutOfMemoryError. Im searching for solution to read one by one all JSON objects without keeping the whole file in my memory. Possible solution is to use method open()
from object FileIO.ReadableFile
which returns ReadableByteChannel
channel but Im not sure how to use that channel to read specifically one JSON object from that channel.
在这种情况下,文件的整个内容将在我的内存中,它带来了获取java.lang.OutOfMemoryError的选项。我正在寻找解决方案,逐个读取所有JSON对象,而不将整个文件保存在我的内存中。可能的解决方案是使用来自对象FileIO.ReadableFile的方法open(),它返回ReadableByteChannel通道,但我不知道如何使用该通道从该通道专门读取一个JSON对象。
Updated solution This is my updated solution which reads the file line by line
更新的解决方案这是我更新的解决方案,它逐行读取文件
ReadableByteChannel readableByteChannel = null;
InputStream inputStream = null;
BufferedReader bufferedReader = null;
try {
// file is a FileIO.ReadableFile
readableByteChannel = file.open();
inputStream = Channels.newInputStream(readableByteChannel);
bufferedReader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
String line;
while ((line = bufferedReader.readLine()) != null) {
if (line.length() > 1) {
// my final output should contain both filename and line
processContext.output(fileName + file);
}
}
} catch (IOException ex) {
logger.error("Exception during reading the file: {}", ex);
} finally {
IOUtils.closeQuietly(bufferedReader);
IOUtils.closeQuietly(inputStream);
}
I see that this solution doesnt work with Dataflow running on n1-standard-1 machine and throws java.lang.OutOfMemoryError: GC overhead limit exceeded
exception and works correctly on n1-standard-2 machine.
我看到这个解决方案不适用于在n1-standard-1机器上运行的Dataflow并抛出java.lang.OutOfMemoryError:GC开销限制超出异常并且在n1-standard-2机器上正常工作。
1 个解决方案
#1
0
ReadableByteChannel
is a java NIO API, introduced in Java 7. Java provides a way to convert it to an InputStream: InputStream bis = Channels.newInputStream(file.open());
- I believe this is the only change you need to make.
ReadableByteChannel是Java NIO API,在Java 7中引入.Java提供了一种将其转换为InputStream的方法:InputStream bis = Channels.newInputStream(file.open()); - 我相信这是你需要做的唯一改变。
#1
0
ReadableByteChannel
is a java NIO API, introduced in Java 7. Java provides a way to convert it to an InputStream: InputStream bis = Channels.newInputStream(file.open());
- I believe this is the only change you need to make.
ReadableByteChannel是Java NIO API,在Java 7中引入.Java提供了一种将其转换为InputStream的方法:InputStream bis = Channels.newInputStream(file.open()); - 我相信这是你需要做的唯一改变。