I have seen many questions on the same topic. But, I am still having problem with writing to GCS. I am reading the topic from pubsub and trying to push this to GCS. I have referred to this link. But, couldn't find the IOChannelUtils in the latest beam packages.
我在同一主题上看到了很多问题。但是,我仍然有写入GCS的问题。我正在阅读pubsub中的主题,并试图将其推向GCS。我已经提到过这个链接。但是,在最新的光束包中找不到IOChannelUtils。
PCollection<String> details = pipeline
.apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic"));
PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() {
public String apply(String s) {
return "constant";
}
}));
PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY)
.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10))
.withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS))))
.discardingFiredPanes()).apply(GroupByKey.create());
PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create());
This I have taken from many other similar topics in stack overflow. Now, I understand that, TextIO do support unbounded PCollection write option with withWindowedWrites and withNumShards.
这是我从堆栈溢出中的许多其他类似主题。现在,据我所知,TextIO支持withWindowedWrites和withNumShards的*PCollection写选项。
ref : Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn
参考:使用DoFn使用Cloud Dataflow从PubSub写入Google云端存储
But, I did not understand, how I should do this.
但是,我不明白,我该怎么做。
I am trying to write to GCS as follows.
我正在尝试按如下方式写入GCS。
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
details.apply(TextIO.write().to("gs://<bucket>/topicfile").withWindowedWrites()
.withFilenamePolicy(policy).withNumShards(4));
I do not have sufficient points to add comments to those topics in Stack Overflow, hence I am raising it as a different question.
我没有足够的要点在Stack Overflow中为这些主题添加注释,因此我将其作为一个不同的问题提出。
2 个解决方案
#1
3
Check out this Pub/Sub to GCS Pipeline which provides a full example of writing windowed files to GCS.
查看此Pub / Sub到GCS Pipeline,它提供了向GCS写入窗口文件的完整示例。
#2
2
I could solve this issue by modifying the Windowing as given below
我可以通过修改下面给出的窗口来解决这个问题
PCollection<String> streamedDataWindows = streamedData.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(30))
)).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes());
streamedDataWindows.apply(TextIO.write().to(CLOUD_STORAGE).withWindowedWrites().withNumShards(1).withFilenamePolicy(new PerWindowFiles()));
public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
// OVERRIDE THE FILE NAME CREATION
}
}
Though I could solve it like this, I am still not sure about the windowing concept here. I will add more details as an when I find it. If anyone has more understanding, please add more details. Thanks
虽然我可以像这样解决它,但我仍然不确定这里的窗口概念。我会在找到它时添加更多细节。如果有人有更多的了解,请添加更多详细信息。谢谢
#1
3
Check out this Pub/Sub to GCS Pipeline which provides a full example of writing windowed files to GCS.
查看此Pub / Sub到GCS Pipeline,它提供了向GCS写入窗口文件的完整示例。
#2
2
I could solve this issue by modifying the Windowing as given below
我可以通过修改下面给出的窗口来解决这个问题
PCollection<String> streamedDataWindows = streamedData.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(30))
)).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes());
streamedDataWindows.apply(TextIO.write().to(CLOUD_STORAGE).withWindowedWrites().withNumShards(1).withFilenamePolicy(new PerWindowFiles()));
public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
// OVERRIDE THE FILE NAME CREATION
}
}
Though I could solve it like this, I am still not sure about the windowing concept here. I will add more details as an when I find it. If anyone has more understanding, please add more details. Thanks
虽然我可以像这样解决它,但我仍然不确定这里的窗口概念。我会在找到它时添加更多细节。如果有人有更多的了解,请添加更多详细信息。谢谢