If I want to output a SCollection of TableRow or String to google cloud storage (GCS) I'm using saveAsTableRowJsonFile or saveAsTextFile, respectively. Both of these methods ultimately use
如果我想输出一个TableRow或String的SCollection到谷歌云存储(GCS)我分别使用saveAsTableRowJsonFile或saveAsTextFile。这两种方法最终都使用
private[scio] def pathWithShards(path: String) = path.replaceAll("\\/+$", "") + "/part"
which enforces that file names start with "part". Is the only way to output a custom sharded file via to use saveAsCustomOutput?
强制该文件名以“part”开头。是通过使用saveAsCustomOutput输出自定义分片文件的唯一方法吗?
1 个解决方案
#1
1
I had to do it in beam code via saveAsCustomOutput
我必须通过saveAsCustomOutput在beam代码中完成它
import org.apache.beam.sdk.util.Transport
val jsonFactory: JsonFactory = Transport.getJsonFactory
val outputPath = "gs://foo/bar_" // file prefix will be bar_
@BigQueryType.toTable()
case class Clazz(foo: String, bar: String)
val collection: SCollection[Clazz] = ....
collection.map(Clazz.toTableRow).
map(jsonFactory.toString).
saveAsCustomOutput(name = "CustomWrite", io.TextIO.write()
.to(outputPath)
.withSuffix("")
.withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP))
#1
1
I had to do it in beam code via saveAsCustomOutput
我必须通过saveAsCustomOutput在beam代码中完成它
import org.apache.beam.sdk.util.Transport
val jsonFactory: JsonFactory = Transport.getJsonFactory
val outputPath = "gs://foo/bar_" // file prefix will be bar_
@BigQueryType.toTable()
case class Clazz(foo: String, bar: String)
val collection: SCollection[Clazz] = ....
collection.map(Clazz.toTableRow).
map(jsonFactory.toString).
saveAsCustomOutput(name = "CustomWrite", io.TextIO.write()
.to(outputPath)
.withSuffix("")
.withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP))