spark streaming中使用checkpoint

时间:2023-01-20 10:41:36

从官方的Programming Guides中看到的

我理解streaming中的checkpoint有两种,一种指的是metadata的checkpoint,用于恢复你的streaming;一种是rdd的checkpoint的;下面的代码指的是第一种:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
} // Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ... // Start the context
context.start()
context.awaitTermination()