I'm creating sliding time windows 20 seconds long every 5 seconds from batched log data:
我从批处理的日志数据中每隔5秒创建20秒长的滑动时间窗口:
rows = p | 'read events' >> beam.io.Read(beam.io.BigQuerySource(query=query))
# set timestamp field used for windowing and set 20 second long window every 5 seconds
ts_rows = (rows | 'set timestamp' >> beam.ParDo(AddTimestampDoFn())
| 'set window' >> beam.WindowInto(window.SlidingWindows(20,5)))
# extract only user id and relevant data, group and process
rows_with_data = (ts_rows | 'extract data' >> beam.FlatMap(lambda row:
[(str(row['user_id']),[row['data1'], row['data2'],row['data3']])])
| 'group by user id' >> beam.GroupByKey()
| 'Process window' >> beam.ParDo(WindowDataProcessingDoFn()))
How can I access the timestamp information for each window in Python? (An answer for Java is here but I don't know how to translate it into Python: How to get the max timestamp of the current sliding window) Ideally I'd want the end time of each window rather than the max or min timestamp of the data within the window.
如何在Python中访问每个窗口的时间戳信息? (Java的答案在这里,但我不知道如何将其翻译成Python:如何获得当前滑动窗口的最大时间戳)理想情况下我想要每个窗口的结束时间而不是最大或最小时间戳窗口内的数据。
1 个解决方案
#1
0
I went to the link you provided.
我去了你提供的链接。
Note: window=beam.DoFn.WindowParam
is the parameter which is mentioned on the page you linked.
注意:window = beam.DoFn.WindowParam是您链接的页面上提到的参数。
The window end time is beam.DoFn.WindowParam.end
. In Python, you can access it in like this:
窗口结束时间是beam.DoFn.WindowParam.end。在Python中,您可以像这样访问它:
Define your DoFn:
定义你的DoFn:
class BuildRecordFn(beam.DoFn):
def __init__(self):
super(BuildAdsRecordFn, self).__init__()
def process(self, element, window=beam.DoFn.WindowParam):
#window_start = window.start.to_utc_datetime()
window_end = window.end.to_utc_datetime()
return [element + (window_end,)]
Then use it like this:
然后像这样使用它:
lines = p | ReadFromText(known_args.input)
counts = (
lines
| 'ParseEventFn' >> beam.ParDo(ParseEventFn())
| 'AddEventTimestamp' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| 'ExtractObjectID' >> beam.Map(lambda elem: (elem["objectID"]))
| 'FixedWindow' >> beam.WindowInto(
beam.window.FixedWindows(60*1))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum)
| 'AddWindowEndTimestamp'(beam.ParDo(BuildRecordFn()))
| 'Format' >> beam.Map(format_result)
| WriteToText(known_args.output)
def format_result(xs):
ys = [str(x) for x in xs]
return ','.join(ys)
#1
0
I went to the link you provided.
我去了你提供的链接。
Note: window=beam.DoFn.WindowParam
is the parameter which is mentioned on the page you linked.
注意:window = beam.DoFn.WindowParam是您链接的页面上提到的参数。
The window end time is beam.DoFn.WindowParam.end
. In Python, you can access it in like this:
窗口结束时间是beam.DoFn.WindowParam.end。在Python中,您可以像这样访问它:
Define your DoFn:
定义你的DoFn:
class BuildRecordFn(beam.DoFn):
def __init__(self):
super(BuildAdsRecordFn, self).__init__()
def process(self, element, window=beam.DoFn.WindowParam):
#window_start = window.start.to_utc_datetime()
window_end = window.end.to_utc_datetime()
return [element + (window_end,)]
Then use it like this:
然后像这样使用它:
lines = p | ReadFromText(known_args.input)
counts = (
lines
| 'ParseEventFn' >> beam.ParDo(ParseEventFn())
| 'AddEventTimestamp' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| 'ExtractObjectID' >> beam.Map(lambda elem: (elem["objectID"]))
| 'FixedWindow' >> beam.WindowInto(
beam.window.FixedWindows(60*1))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum)
| 'AddWindowEndTimestamp'(beam.ParDo(BuildRecordFn()))
| 'Format' >> beam.Map(format_result)
| WriteToText(known_args.output)
def format_result(xs):
ys = [str(x) for x in xs]
return ','.join(ys)