068 mapWithState函数的讲解

时间:2022-05-28 11:21:24

1.问题

  主要是updateStateByKey的问题

  有的值不需要变化的时候,还会再打印出来。

  每个批次的数据都会出现,如果向redis保存更新的时候,会把不需要变化的值也更新,这个不是我们需要的,我们只需要更新有变化的那部分值。

  

2.mapWithState

  有一个注解,说明是实验性质的。

  068 mapWithState函数的讲解

3.程序

 package com.stream.it
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} object MapWithState {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StreamingMapWithState")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(1))
// 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
// 路径对应的文件夹不能存在
ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir45254") /**
*
* @param key DStream的key数据类型
* @param values DStream的value数据类型
* @param state 是StreamingContext中之前该key的状态值
* @return
*/
def mappingFunction(key: String, values: Option[Int], state: State[Long]): (String, Long) = {
// 获取之前状态的值
val preStateValue = state.getOption().getOrElse(0L)
// 计算出当前值
val currentStateValue = preStateValue + values.getOrElse(0) // 更新状态值
state.update(currentStateValue) // 返回结果
(key, currentStateValue)
}
val spec = StateSpec.function[String, Int, Long, (String, Long)](mappingFunction _) val kafkaParams = Map(
"group.id" -> "streaming-kafka-001231",
"zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
"auto.offset.reset" -> "smallest"
)
val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
ssc, // 给定SparkStreaming上下文
kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
topics, // 给定读取对应topic的名称以及读取数据的线程数量
StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
).map(_._2) val resultWordCount: DStream[(String, Long)] = dstream
.filter(line => line.nonEmpty)
.flatMap(line => line.split(" ").map((_, 1)))
.reduceByKey(_ + _)
.mapWithState(spec) resultWordCount.print() // 这个也是打印数据 // 启动开始处理
ssc.start()
ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
}
}

4.效果

  068 mapWithState函数的讲解

  在控制台上再写入一个hadoop:

    说明了,在新写入的时候,才会出现,但是以前的数据还在。

  068 mapWithState函数的讲解

5.说明

  因为存在checkpoint,在重新后,以前的数据还在,新加入数据后,会在原有的基础上进行更新,上面的第二幅图就是这样产生的。