I am producing an output CSV file with a Dataflow and I'd like to write a header row and then append all of my output to that. How can I go about doing that?
我正在生成一个带有数据流的输出CSV文件,我想写一个标题行,然后将所有输出附加到该文件。我怎么能这样做呢?
My flow roughly looks like this:
我的流程大致如下:
PCollection<String> output = data.apply(ParDo.of(new DoFn<String, String>() {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
// Produce CSV output
}
})).apply(TextIO.Write.named("WriteData").to(options.getOutput()));
Thanks!
谢谢!
2 个解决方案
#1
2
The proper way to do this is to use the Custom Sink API. You can derive from FileBasedSink
, FileBasedWriteOperation
and FileBasedWriter
(for example, you can name your classes respectively CSVSink
, CSVWriteOperation
and CSVWriter
).
正确的方法是使用Custom Sink API。您可以从FileBasedSink,FileBasedWriteOperation和FileBasedWriter派生(例如,您可以分别为CSVSink,CSVWriteOperation和CSVWriter命名您的类)。
The only non-trivial logic will be in CSVWriter
. Write the header in its writeHeader()
and write the CSV entries in write()
.
唯一的非平凡逻辑将在CSVWriter中。在其writeHeader()中写入标头,并在write()中写入CSV条目。
Then you can use the sink in the pipeline using the Write.to()
transform, in place of TextIO
you're currently using.
然后,您可以使用Write.to()转换在管道中使用接收器,代替您当前使用的TextIO。
A good example built into the SDK is the XML sink.
SDK中内置的一个很好的例子是XML接收器。
#2
0
You can output the header from DoFn#startBundle()
.
您可以从DoFn #startBundle()输出标头。
#1
2
The proper way to do this is to use the Custom Sink API. You can derive from FileBasedSink
, FileBasedWriteOperation
and FileBasedWriter
(for example, you can name your classes respectively CSVSink
, CSVWriteOperation
and CSVWriter
).
正确的方法是使用Custom Sink API。您可以从FileBasedSink,FileBasedWriteOperation和FileBasedWriter派生(例如,您可以分别为CSVSink,CSVWriteOperation和CSVWriter命名您的类)。
The only non-trivial logic will be in CSVWriter
. Write the header in its writeHeader()
and write the CSV entries in write()
.
唯一的非平凡逻辑将在CSVWriter中。在其writeHeader()中写入标头,并在write()中写入CSV条目。
Then you can use the sink in the pipeline using the Write.to()
transform, in place of TextIO
you're currently using.
然后,您可以使用Write.to()转换在管道中使用接收器,代替您当前使用的TextIO。
A good example built into the SDK is the XML sink.
SDK中内置的一个很好的例子是XML接收器。
#2
0
You can output the header from DoFn#startBundle()
.
您可以从DoFn #startBundle()输出标头。