Apache Beam TextIO.Read与行号

时间:2022-12-18 15:35:49

Is it possible to get access to line numbers with the lines read into the PCollection from TextIO.Read? For context here, I'm processing a CSV file and need access to the line number for a given line.

是否可以通过从TextIO.Read读入PCollection的行来访问行号?对于此处的上下文,我正在处理CSV文件,并且需要访问给定行的行号。

If not possible through TextIO.Read it seems like it should be possible using some kind of custom Read or transform, but I'm having trouble figuring out where to begin.

如果不可能通过TextIO.Read似乎应该可以使用某种自定义读取或转换,但我无法弄清楚从哪里开始。

1 个解决方案

#1


1  

You can use FileIO to read the file manually, where you can determine the line number when you read from the ReadableFile.

您可以使用FileIO手动读取文件,从ReadableFile读取时可以确定行号。

A simple solution can look as follows:

一个简单的解决方案可以如下所示:

p
    .apply(FileIO.match().filepattern("/file.csv"))
    .apply(FileIO.readMatches())
    .apply(FlatMapElements
            .into(strings())
            .via((FileIO.ReadableFile f) -> {
                List<String> result = new ArrayList<>();
                try (BufferedReader br = new BufferedReader(Channels.newReader(f.open(), "UTF-8"))) {
                    int lineNr = 1;
                    String line = br.readLine();
                    while (line != null) {
                        result.add(lineNr + "," + line);
                        line = br.readLine();
                        lineNr++;
                    }
                } catch (IOException e) {
                    throw new RuntimeException("Error while reading", e);
                }
                return result;
            }));

The solution above just prepends the line number to each input line.

上面的解决方案只是为每个输入行添加行号。

#1


1  

You can use FileIO to read the file manually, where you can determine the line number when you read from the ReadableFile.

您可以使用FileIO手动读取文件,从ReadableFile读取时可以确定行号。

A simple solution can look as follows:

一个简单的解决方案可以如下所示:

p
    .apply(FileIO.match().filepattern("/file.csv"))
    .apply(FileIO.readMatches())
    .apply(FlatMapElements
            .into(strings())
            .via((FileIO.ReadableFile f) -> {
                List<String> result = new ArrayList<>();
                try (BufferedReader br = new BufferedReader(Channels.newReader(f.open(), "UTF-8"))) {
                    int lineNr = 1;
                    String line = br.readLine();
                    while (line != null) {
                        result.add(lineNr + "," + line);
                        line = br.readLine();
                        lineNr++;
                    }
                } catch (IOException e) {
                    throw new RuntimeException("Error while reading", e);
                }
                return result;
            }));

The solution above just prepends the line number to each input line.

上面的解决方案只是为每个输入行添加行号。