I'm looking for a way to read ENTIRE files so that every file will be read entirely to a single String. I want to pass a pattern of JSON text files on gs://my_bucket/*/*.json, have a ParDo then process each and every file entirely.
我正在寻找一种方法来读取整个文件,以便每个文件都可以完全读取到一个字符串。我想在gs://my_bucket/*/*.json上传递JSON文本文件的模式,然后让ParDo完全处理每个文件。
What's the best approach to it?
什么是最好的方法?
2 个解决方案
#1
3
I am going to give the most generally useful answer, even though there are special cases [1] where you might do something different.
我将给出最普遍有用的答案,即使有特殊情况[1]你可能会做一些不同的事情。
I think what you want to do is to define a new subclass of FileBasedSource
and use Read.from(<source>)
. Your source will also include a subclass of FileBasedReader
; the source contains the configuration data and the reader actually does the reading.
我想你想要做的是定义一个新的FileBasedSource子类并使用Read.from(
I think a full description of the API is best left to the Javadoc, but I will highlight the key override points and how they relate to your needs:
我认为API的完整描述最好留给Javadoc,但我将重点介绍关键覆盖点以及它们与您的需求之间的关系:
-
FileBasedSource#isSplittable()
you will want to override and returnfalse
. This will indicate that there is no intra-file splitting. - FileBasedSource#isSplittable()你想要覆盖并返回false。这将表明没有文件内拆分。
-
FileBasedSource#createForSubrangeOfFile(String, long, long)
you will override to return a sub-source for just the file specified. - FileBasedSource#createForSubrangeOfFile(String,long,long)您将覆盖以仅返回指定文件的子源。
-
FileBasedSource#createSingleFileReader()
you will override to produce aFileBasedReader
for the current file (the method should assume it is already split to the level of a single file). - 您将覆盖FileBasedSource#createSingleFileReader()以生成当前文件的FileBasedReader(该方法应该假设它已经拆分为单个文件的级别)。
To implement the reader:
实现读者:
-
FileBasedReader#startReading(...)
you will override to do nothing; the framework will already have opened the file for you, and it will close it. - FileBasedReader #startReading(...)你将覆盖什么都不做;框架已经为您打开了文件,它将关闭它。
-
FileBasedReader#readNextRecord()
you will override to read the entire file as a single element. - FileBasedReader#readNextRecord()您将覆盖以将整个文件作为单个元素读取。
[1] One example easy special case is when you actually have a small number of files, you can expand them prior to job submission, and they all take the same amount of time to process. Then you can just use Create.of(expand(<glob>))
followed by ParDo(<read a file>)
.
[1]一个简单的例子就是当你实际拥有少量文件时,你可以在提交作业之前扩展它们,并且它们都需要相同的时间来处理。然后你可以使用Create.of(展开(
#2
1
Was looking for similar solution myself. Following Kenn's recommendations and few other references such as XMLSource.java, created the following custom source which seems to be working fine.
我自己在寻找类似的解决方案。遵循Kenn的建议和其他一些参考文献,如XMLSource.java,创建了以下自定义源代码,似乎工作正常。
I am not a developer so if anyone has suggestions on how to improve it, please feel free to contribute.
我不是开发人员,所以如果有人有关于如何改进它的建议,请随时贡献。
public class FileIO {
// Match TextIO.
public static Read.Bounded<KV<String,String>> readFilepattern(String filepattern) {
return Read.from(new FileSource(filepattern, 1));
}
public static class FileSource extends FileBasedSource<KV<String,String>> {
private String filename = null;
public FileSource(String fileOrPattern, long minBundleSize) {
super(fileOrPattern, minBundleSize);
}
public FileSource(String filename, long minBundleSize, long startOffset, long endOffset) {
super(filename, minBundleSize, startOffset, endOffset);
this.filename = filename;
}
// This will indicate that there is no intra-file splitting.
@Override
public boolean isSplittable(){
return false;
}
@Override
public boolean producesSortedKeys(PipelineOptions options) throws Exception {
return false;
}
@Override
public void validate() {}
@Override
public Coder<KV<String,String>> getDefaultOutputCoder() {
return KvCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of());
}
@Override
public FileBasedSource<KV<String,String>> createForSubrangeOfFile(String fileName, long start, long end) {
return new FileSource(fileName, getMinBundleSize(), start, end);
}
@Override
public FileBasedReader<KV<String,String>> createSingleFileReader(PipelineOptions options) {
return new FileReader(this);
}
}
/**
* A reader that should read entire file of text from a {@link FileSource}.
*/
private static class FileReader extends FileBasedSource.FileBasedReader<KV<String,String>> {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
private ReadableByteChannel channel = null;
private long nextOffset = 0;
private long currentOffset = 0;
private boolean isAtSplitPoint = false;
private final ByteBuffer buf;
private static final int BUF_SIZE = 1024;
private KV<String,String> currentValue = null;
private String filename;
public FileReader(FileSource source) {
super(source);
buf = ByteBuffer.allocate(BUF_SIZE);
buf.flip();
this.filename = source.filename;
}
private int readFile(ByteArrayOutputStream out) throws IOException {
int byteCount = 0;
while (true) {
if (!buf.hasRemaining()) {
buf.clear();
int read = channel.read(buf);
if (read < 0) {
break;
}
buf.flip();
}
byte b = buf.get();
byteCount++;
out.write(b);
}
return byteCount;
}
@Override
protected void startReading(ReadableByteChannel channel) throws IOException {
this.channel = channel;
}
@Override
protected boolean readNextRecord() throws IOException {
currentOffset = nextOffset;
ByteArrayOutputStream buf = new ByteArrayOutputStream();
int offsetAdjustment = readFile(buf);
if (offsetAdjustment == 0) {
// EOF
return false;
}
nextOffset += offsetAdjustment;
isAtSplitPoint = true;
currentValue = KV.of(this.filename,CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), buf.toByteArray()));
return true;
}
@Override
protected boolean isAtSplitPoint() {
return isAtSplitPoint;
}
@Override
protected long getCurrentOffset() {
return currentOffset;
}
@Override
public KV<String,String> getCurrent() throws NoSuchElementException {
return currentValue;
}
}
}
#1
3
I am going to give the most generally useful answer, even though there are special cases [1] where you might do something different.
我将给出最普遍有用的答案,即使有特殊情况[1]你可能会做一些不同的事情。
I think what you want to do is to define a new subclass of FileBasedSource
and use Read.from(<source>)
. Your source will also include a subclass of FileBasedReader
; the source contains the configuration data and the reader actually does the reading.
我想你想要做的是定义一个新的FileBasedSource子类并使用Read.from(
I think a full description of the API is best left to the Javadoc, but I will highlight the key override points and how they relate to your needs:
我认为API的完整描述最好留给Javadoc,但我将重点介绍关键覆盖点以及它们与您的需求之间的关系:
-
FileBasedSource#isSplittable()
you will want to override and returnfalse
. This will indicate that there is no intra-file splitting. - FileBasedSource#isSplittable()你想要覆盖并返回false。这将表明没有文件内拆分。
-
FileBasedSource#createForSubrangeOfFile(String, long, long)
you will override to return a sub-source for just the file specified. - FileBasedSource#createForSubrangeOfFile(String,long,long)您将覆盖以仅返回指定文件的子源。
-
FileBasedSource#createSingleFileReader()
you will override to produce aFileBasedReader
for the current file (the method should assume it is already split to the level of a single file). - 您将覆盖FileBasedSource#createSingleFileReader()以生成当前文件的FileBasedReader(该方法应该假设它已经拆分为单个文件的级别)。
To implement the reader:
实现读者:
-
FileBasedReader#startReading(...)
you will override to do nothing; the framework will already have opened the file for you, and it will close it. - FileBasedReader #startReading(...)你将覆盖什么都不做;框架已经为您打开了文件,它将关闭它。
-
FileBasedReader#readNextRecord()
you will override to read the entire file as a single element. - FileBasedReader#readNextRecord()您将覆盖以将整个文件作为单个元素读取。
[1] One example easy special case is when you actually have a small number of files, you can expand them prior to job submission, and they all take the same amount of time to process. Then you can just use Create.of(expand(<glob>))
followed by ParDo(<read a file>)
.
[1]一个简单的例子就是当你实际拥有少量文件时,你可以在提交作业之前扩展它们,并且它们都需要相同的时间来处理。然后你可以使用Create.of(展开(
#2
1
Was looking for similar solution myself. Following Kenn's recommendations and few other references such as XMLSource.java, created the following custom source which seems to be working fine.
我自己在寻找类似的解决方案。遵循Kenn的建议和其他一些参考文献,如XMLSource.java,创建了以下自定义源代码,似乎工作正常。
I am not a developer so if anyone has suggestions on how to improve it, please feel free to contribute.
我不是开发人员,所以如果有人有关于如何改进它的建议,请随时贡献。
public class FileIO {
// Match TextIO.
public static Read.Bounded<KV<String,String>> readFilepattern(String filepattern) {
return Read.from(new FileSource(filepattern, 1));
}
public static class FileSource extends FileBasedSource<KV<String,String>> {
private String filename = null;
public FileSource(String fileOrPattern, long minBundleSize) {
super(fileOrPattern, minBundleSize);
}
public FileSource(String filename, long minBundleSize, long startOffset, long endOffset) {
super(filename, minBundleSize, startOffset, endOffset);
this.filename = filename;
}
// This will indicate that there is no intra-file splitting.
@Override
public boolean isSplittable(){
return false;
}
@Override
public boolean producesSortedKeys(PipelineOptions options) throws Exception {
return false;
}
@Override
public void validate() {}
@Override
public Coder<KV<String,String>> getDefaultOutputCoder() {
return KvCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of());
}
@Override
public FileBasedSource<KV<String,String>> createForSubrangeOfFile(String fileName, long start, long end) {
return new FileSource(fileName, getMinBundleSize(), start, end);
}
@Override
public FileBasedReader<KV<String,String>> createSingleFileReader(PipelineOptions options) {
return new FileReader(this);
}
}
/**
* A reader that should read entire file of text from a {@link FileSource}.
*/
private static class FileReader extends FileBasedSource.FileBasedReader<KV<String,String>> {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
private ReadableByteChannel channel = null;
private long nextOffset = 0;
private long currentOffset = 0;
private boolean isAtSplitPoint = false;
private final ByteBuffer buf;
private static final int BUF_SIZE = 1024;
private KV<String,String> currentValue = null;
private String filename;
public FileReader(FileSource source) {
super(source);
buf = ByteBuffer.allocate(BUF_SIZE);
buf.flip();
this.filename = source.filename;
}
private int readFile(ByteArrayOutputStream out) throws IOException {
int byteCount = 0;
while (true) {
if (!buf.hasRemaining()) {
buf.clear();
int read = channel.read(buf);
if (read < 0) {
break;
}
buf.flip();
}
byte b = buf.get();
byteCount++;
out.write(b);
}
return byteCount;
}
@Override
protected void startReading(ReadableByteChannel channel) throws IOException {
this.channel = channel;
}
@Override
protected boolean readNextRecord() throws IOException {
currentOffset = nextOffset;
ByteArrayOutputStream buf = new ByteArrayOutputStream();
int offsetAdjustment = readFile(buf);
if (offsetAdjustment == 0) {
// EOF
return false;
}
nextOffset += offsetAdjustment;
isAtSplitPoint = true;
currentValue = KV.of(this.filename,CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), buf.toByteArray()));
return true;
}
@Override
protected boolean isAtSplitPoint() {
return isAtSplitPoint;
}
@Override
protected long getCurrentOffset() {
return currentOffset;
}
@Override
public KV<String,String> getCurrent() throws NoSuchElementException {
return currentValue;
}
}
}