Spark Streaming使用checkpoint容错

时间:2022-10-15 20:46:29

一、checkpotin说明

  流媒体应用程序必须全天候运行,因此必须对与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)具有恢复能力。为了做到这一点,Spark Streaming需要检查点足够的信息到容错存储系统,以便从故障中恢复。有两种类型的检查点数据。
 

  1. 元数据检查点 - 将定义流式计算的信息保存到HDFS等容错存储中。这用于从运行流应用程序的驱动程序的节点的故障中恢复(稍后详细讨论)。元数据包括:

    • 配置 - 用于创建流应用程序的配置。
    • DStream操作 - 定义流应用程序的一组DStream操作。
    • 不完整的批次 - 作业排队但尚未完成的批次。
  2. 数据检查点 - 保存已生成的RDDs至可靠的存储。这在某些有状态转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链。

必须使用检查点的场景

  1. 有状态转换的使用 - 如果在应用程序中使用updateStateByKey或者reduceByKeyAndWindow(与反函数),则必须提供检查点目录以允许周期性RDD检查点。
  2. 从运行应用程序的驱动程序的故障中恢复 - 元数据检查点用于通过进度信息进行恢复。

总而言之,元数据检查点主要用于从驱动程序故障中恢复,而数据或RDD检查点对于使用有状态转换时的基本功能是必需的。

二、实现

  1. driver容错
package spark.examples.streaming.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Application {

private static final Logger logger = LoggerFactory.getLogger(Application.class);

private static final String CHECKPOINT_PATH = "checkpoint";

public static void main(String[] args) {
Function0<JavaStreamingContext> createContextFunc = () -> createContext(CHECKPOINT_PATH);

JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHECKPOINT_PATH, createContextFunc);

jssc.start();

try {
jssc.awaitTermination();
} catch (InterruptedException e) {
logger.error("等待处理停止错误!", e);
}

}

private static JavaStreamingContext createContext(String checkpointDirectory) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]").setAppName("TEST");

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
jssc.checkpoint(checkpointDirectory);

//DStream所有转化或行动操作

return jssc;
}

}

说明:
1.处理的逻辑必须写在createContextFunc 函数中,你要是直接写在main方法中,在首次启动后,kill关闭,再启动就会报错

17/07/13 10:57:10 INFO WriteAheadLogManager  for Thread: Reading from the logs:  
hdfs://master:9000/csw/tmp/test3/receivedBlockMetadata/log-1499914584482-1499914644482
17/07/13 10:57:10 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@4735d6e5 has not been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:323)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

2. dstream容错

dstream.checkpoint(checkpointInterval)

三、总结

 虽然数据可靠性得到保障了,但是要谨慎的设置刷新间隔,这可能会影响吞吐量,因为每隔固定时间都要向文件系统上写入checkpoint数据,spark streaming官方推荐对于需要RDD检查点的有状态转换,缺省间隔是至少10秒的批间隔的倍数。可以通过使用设置 dstream.checkpoint(checkpointInterval)。通常情况下,一个DStream的5到10个滑动间隔的检查点间隔是一个很好的尝试。