I have a structured streaming dataframe that I want to, for each row, take the value in one column (in this case a timestamp like 1525670700) and use that value to query another static dataframe for the nearest timestamp to that value and merge the one row returned from that query to the structured streaming data frame e.g.,
我有一个结构化的流数据帧,我希望,对于每一行,取一列中的值(在这种情况下是一个时间戳,如1525670700)并使用该值查询另一个静态数据帧,以获得该值的最近时间戳并合并一个从该查询返回到结构化流数据帧的行,例如,
my_row_to_merge_df = weather_df.filter(weather_df.timestamp_unix > 1525670700).sort(col('timestamp_unix').asc()).limit(1).show()
my_row_to_merge_df = weather_df.filter(weather_df.timestamp_unix> 1525670700).sort(col('timestamp_unix')。asc())。limit(1).show()
With that I want to do something like:
有了这个我想做的事情:
joined_df = streaming_df.merge(function_to_return_row_from_other_df(col('timestamp')))
joined_df = streaming_df.merge(function_to_return_row_from_other_df(col('timestamp')))
The problem with using a join in this case is that the timestamps might not match perfectly, but it is okay if they are a few seconds or even a few minutes different.
在这种情况下使用连接的问题是时间戳可能不完全匹配,但如果它们是几秒甚至几分钟不同就没关系。
So at that, I am not sure what kind of operation I could do to get this result.
所以,在那,我不知道我能做什么样的操作才能得到这个结果。
1 个解决方案
#1
0
You can round a timestamp to the significance level you need with:
您可以将时间戳舍入到所需的显着性级别:
# Assuming timestamp is in seconds
timestamp_rounded = int(timestamp - (timestamp % int(minutes * 60)))
Where "minutes" is the level you want to round to. For example, if you choose minutes = 5 you will round (down) the timestamp to every 5 minutes.
其中“分钟”是您要舍入的级别。例如,如果选择minutes = 5,则将时间戳舍入(下)到每5分钟。
You can add the new column like shown here:How to do mathematical operation with two column in dataframe using pyspark
您可以添加新列,如下所示:如何使用pyspark在数据框中使用两列进行数学运算
#1
0
You can round a timestamp to the significance level you need with:
您可以将时间戳舍入到所需的显着性级别:
# Assuming timestamp is in seconds
timestamp_rounded = int(timestamp - (timestamp % int(minutes * 60)))
Where "minutes" is the level you want to round to. For example, if you choose minutes = 5 you will round (down) the timestamp to every 5 minutes.
其中“分钟”是您要舍入的级别。例如,如果选择minutes = 5,则将时间戳舍入(下)到每5分钟。
You can add the new column like shown here:How to do mathematical operation with two column in dataframe using pyspark
您可以添加新列,如下所示:如何使用pyspark在数据框中使用两列进行数学运算