I am a beginner with Apache Spark. I am trying to run a stream job which recieves some data ,convert it into dataframe and run some processing like joining and removing duplicates etc . Now I have to cache this processed data so that I can append this with next dstream (using some union/join) and do processing again.
我是Apache Spark的初学者。我正在尝试运行一个流作业,它接收一些数据,将其转换为数据帧并运行一些处理,如加入和删除重复项等。现在我必须缓存这个已处理的数据,以便我可以将其附加到下一个dstream(使用一些union / join)并再次进行处理。
I tried using dataframe.cache() to cache and re use this in next stream batch.
我尝试使用dataframe.cache()缓存并在下一个流批处理中重新使用它。
For example,if df is rdd formed from dstream.
例如,如果df是由dstream形成的rdd。
foreachrdd{
new =df.unionAll(processed)
new.registerTempTable("TableScheme")
sql.( //perform inner join and some other processing)
processed=new
processed.cache();
}
When we perform Dataframe.cache or Dataframe.persist() are we caching the actual data or the DAG / transformations applied ? When second stream comes, my program exits with
当我们执行Dataframe.cache或Dataframe.persist()时,我们是否缓存实际数据或应用的DAG /转换?当第二个流来时,我的程序退出
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
org.apache.spark.SparkException: Attempted to use BlockRDD[1] at socketTextStream after its blocks have been removed!
1 个解决方案
#1
0
The cache()
functionality performs the processing defined in the DAG up to the cache()
call and stores the data appropriately so that computations are not repeated, should more than one action be performed.
cache()功能执行DAG中定义的处理直到cache()调用并适当地存储数据,以便在不执行多个操作的情况下不重复计算。
What you want is some persistence across stream batches.
你想要的是跨批量的一些持久性。
There are several ways to do that:
有几种方法可以做到这一点:
-
streamingContext.remember
(Minutes(5)) holds on to data from previous batches forMinutes(5)
amount of time -
window
ing move a fixed time window across the data, allowing you to perform operations on more than one batch of data -
updateStateByKey
&mapWithState
provide mechanisms to maintain and transform state across batches
streamingContext.remember(Minutes(5))保持以前批次的数据为Minutes(5)的时间量
窗口在数据上移动固定时间窗口,允许您对多个数据批执行操作
updateStateByKey和mapWithState提供了跨批处理维护和转换状态的机制
The approach you choose will depend largely on your use-case.
您选择的方法在很大程度上取决于您的用例。
#1
0
The cache()
functionality performs the processing defined in the DAG up to the cache()
call and stores the data appropriately so that computations are not repeated, should more than one action be performed.
cache()功能执行DAG中定义的处理直到cache()调用并适当地存储数据,以便在不执行多个操作的情况下不重复计算。
What you want is some persistence across stream batches.
你想要的是跨批量的一些持久性。
There are several ways to do that:
有几种方法可以做到这一点:
-
streamingContext.remember
(Minutes(5)) holds on to data from previous batches forMinutes(5)
amount of time -
window
ing move a fixed time window across the data, allowing you to perform operations on more than one batch of data -
updateStateByKey
&mapWithState
provide mechanisms to maintain and transform state across batches
streamingContext.remember(Minutes(5))保持以前批次的数据为Minutes(5)的时间量
窗口在数据上移动固定时间窗口,允许您对多个数据批执行操作
updateStateByKey和mapWithState提供了跨批处理维护和转换状态的机制
The approach you choose will depend largely on your use-case.
您选择的方法在很大程度上取决于您的用例。