Google Cloud Dataflow中FileBasedSource使用示例

时间:2021-04-03 15:26:27

Can someone post a simple example of subclassing FileBasedSource? I'm new to Google Dataflow and very inexperienced with Java. My goal is to read files while including line numbers as a key, or to skip lines based on the line number.

有人可以发布一个子类化FileBasedSource的简单示例吗?我是Google Dataflow的新手,对Java非常缺乏经验。我的目标是在包含行号作为键的同时读取文件,或者根据行号跳过行。

1 个解决方案

#1


1  

The implementation of XMLSource is a good starting point for understanding how FileBasedSource works. You'll likely want something like this for your reader (where readNextLine() reads to the end of a line and updates the offset):

XMLSource的实现是理解FileBasedSource如何工作的良好起点。对于读者来说,您可能需要这样的东西(readNextLine()读取到行的末尾并更新偏移量):

protected void startReading(ReadableByteChannel channel) throws IOException {
  if (getCurrentSource().getMode() == FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE) {
    // If we are not at the beginning of a line, we should ignore the current line.
    if (getCurrentSource().getStartOffset() > 0) {
      SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
      // Start from one character back and read till we find a new line.
      seekChannel.position(seekChannel.position() - 1);
      nextOffset = seekChannel.position() + readNextLine(new ByteArrayOutputStream());
    }
  }
}

I've created a gist with the complete LineIO example, which may be simpler than XMLSource.

我已经用完整的LineIO示例创建了一个要点,它可能比XMLSource更简单。

#1


1  

The implementation of XMLSource is a good starting point for understanding how FileBasedSource works. You'll likely want something like this for your reader (where readNextLine() reads to the end of a line and updates the offset):

XMLSource的实现是理解FileBasedSource如何工作的良好起点。对于读者来说,您可能需要这样的东西(readNextLine()读取到行的末尾并更新偏移量):

protected void startReading(ReadableByteChannel channel) throws IOException {
  if (getCurrentSource().getMode() == FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE) {
    // If we are not at the beginning of a line, we should ignore the current line.
    if (getCurrentSource().getStartOffset() > 0) {
      SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
      // Start from one character back and read till we find a new line.
      seekChannel.position(seekChannel.position() - 1);
      nextOffset = seekChannel.position() + readNextLine(new ByteArrayOutputStream());
    }
  }
}

I've created a gist with the complete LineIO example, which may be simpler than XMLSource.

我已经用完整的LineIO示例创建了一个要点,它可能比XMLSource更简单。