SparkStreaming HA高可用性

时间:2021-01-02 01:32:37

1、UpdateStateByKey、windows等有状态的操作时,自动进行checkpoint,必须设置checkpoint目录,数据保留一份在容错的文件系统中,一旦内存中的数据丢失,可以从文件系统中读取数据,不需要重新计算。

SparkStreaming.checkpoint("hdfs://ip:port/checkpoint")

 

2、Driver高可用性

一、Java版

第一次在创建和启动StreamingContext的时候,那么将持续不断的产生实时计算的元数据并写入检查点,如果driver节点挂掉,那么可以让Spark集群自动重启集群(必须使用yarn cluster模式,spark-submit --deploy-mode cluster --supervise ....),然后继续运行计算程序,没有数据丢失。

private static void testDriverHA() {

  final Streaming checkpointDir="hdfs://ip:port/checkpoint";

  JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {

  @Override
  public JavaStreamingContext create() {
    SparkConf conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("AdClickRealTimeStatSpark");

    JavaStreamingContext jssc = new JavaStreamingContext(
          conf, Durations.seconds(5));
    jssc.checkpoint(checkpointDir);

    Map<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put(Constants.KAFKA_METADATA_BROKER_LIST,
      ConfigurationManager.getProperty(Constants.KAFKA_METADATA_BROKER_LIST));
    String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
    String[] kafkaTopicsSplited = kafkaTopics.split(",");
    Set<String> topics = new HashSet<String>();
    for(String kafkaTopic : kafkaTopicsSplited) {
      topics.add(kafkaTopic);
    }

    JavaPairInputDStream<String, String> adRealTimeLogDStream = KafkaUtils.createDirectStream(
      jssc,
      String.class,
      String.class,
      StringDecoder.class,
      StringDecoder.class,
      kafkaParams,
      topics);

    JavaPairDStream<String, String> filteredAdRealTimeLogDStream =
      filterByBlacklist(adRealTimeLogDStream);
    generateDynamicBlacklist(filteredAdRealTimeLogDStream);
    JavaPairDStream<String, Long> adRealTimeStatDStream = calculateRealTimeStat(
      filteredAdRealTimeLogDStream);
    calculateProvinceTop3Ad(adRealTimeStatDStream);
    calculateAdClickCountByWindow(adRealTimeLogDStream);
    return jssc;
    }
  };

  JavaStreamingContext context = JavaStreamingContext.getOrCreate(
  checkpointDir, contextFactory);
  context.start();
  context.awaitTermination();

}

二、Scala版

 

package cn.piesat.spark

import org.apache.kafka.clients.consumer.{ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingKafka {
private val brokers = "hadoop01:9092"
private val topics = "lj01"
private val checkPointPath = "hdfs://hadoop01:9000/sparkStreaming/kafka6"

def main(args: Array[String]): Unit = {
val spark = getSparkSession()
val streamingContext = StreamingContext.getOrCreate(checkPointPath, () => {
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
ssc.checkpoint(checkPointPath)
val kafkaInputStream = getKafkaInputStream(ssc)
val result = kafkaInputStream.map(x => x.value()).flatMap(x => {
x.split(" ").map(x => {
(x, 1)
})
}).reduceByKey(_ + _)
result.print()
ssc
})
streamingContext.start()
streamingContext.awaitTermination()
}

def getSparkSession(): SparkSession = {
SparkSession.builder()
.appName("kafka_test")
.master("local[4]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}

def getKafkaInputStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
val topicArray = topics.split(",").toList
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "lj00",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicArray, kafkaParams)
)
}

}

注意:对streaming的操作逻辑必须写在StreamingContext.getOrCreate()方法里,因为若是第二次恢复时则执行方法里的逻辑!!!

 

3、实现RDD高可用性,启动WAL预写日志机制

sparkStreaming从原理上说,是通过receiver来进行数据接收的,接收到时的数据,会被划分成一个个的block,block会被组合成batch,针对一个batch,会创建一个Rdd,启动一个job来执行定义的算子操作。receiver主要接收到数据,那么就会立即将数据写入一份到时容错文件系统(比如hdfs)上的checkpoint目录中的,一份磁盘文件中去,作为数据的冗余副本。

  SparkConf conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("AdClickRealTimeStatSpark")
    .set("spark.streaming.receiver.writeAheadLog.enable","true");