I would like to persist the data from Kafka topic to google storage using Data flow.
我想使用数据流将Kafka主题的数据保存到谷歌存储。
I have written a sample code on local, it is working all good.
我在本地编写了一个示例代码,它运行良好。
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read().withBootstrapServers("localhost:9092").withTopic("my-topic")
.withKeyDeserializer(LongDeserializer.class).withValueDeserializer(StringDeserializer.class))
.apply(Window
.<KafkaRecord<Long, String>>
into(FixedWindows.of(Duration.standardMinutes(1)))
)
.apply(FlatMapElements.into(TypeDescriptors.strings())
.via((KafkaRecord<Long, String> line) -> TextUtil.splitLine(line.getKV().getValue())))
.apply(Filter.by((String word) -> StringUtils.isNotEmpty(word))).apply(Count.perElement())
.apply(MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> lineCount) -> lineCount.getKey() + ": " + lineCount.getValue()))
.apply(TextIO.write().withWindowedWrites().withNumShards(1)
.to("resources/temp/wc-kafka-op/wc"));
p.run().waitUntilFinish();
}
Above code works perfectly. But I would like to save output of each window in separate directory.
以上代码完美无缺。但我想将每个窗口的输出保存在单独的目录中。
e.g. {BasePath}/{Window}/{prefix}{Suffice}
例如{基本路径} / {窗口} / {前缀} {}足够
I could not able to get it working.
我无法让它发挥作用。
1 个解决方案
#1
1
TextIO supports windowedWrites, when you can specify how the name is derived. See JavaDoc.
当您可以指定名称的派生方式时,TextIO支持windowedWrites。请参阅JavaDoc。
#1
1
TextIO supports windowedWrites, when you can specify how the name is derived. See JavaDoc.
当您可以指定名称的派生方式时,TextIO支持windowedWrites。请参阅JavaDoc。