SparkStreaming算子reduceByKeyAndWindow的使用

时间:2022-01-19 20:47:24

reduceByKeyAndWindow这个算子也是lazy的,它用来计算一个区间里面的数据,如下图:

SparkStreaming算子reduceByKeyAndWindow的使用

截图自官网,例如每个方块代表5秒钟,上面的虚线框住的是3个窗口就是15秒钟,这里的15秒钟就是窗口的长度,其中虚线到实线移动了2个方块表示10秒钟,这里的10秒钟就表示每隔10秒计算一次窗口长度的数据

举个例子: 如下图

SparkStreaming算子reduceByKeyAndWindow的使用

我是这样理解的:如果这里是使用窗口函数计算wordcount 在第一个窗口(虚线窗口)计算出来(aa, 1)(bb,3)(cc,1)当到达时间10秒后窗口移动到实线窗口,就会计算这个实线窗口中的单词,这里就为(bb,1)(cc,2)(aa,1)

附上程序:

注意:窗口滑动长度和窗口长度一定要是SparkStreaming微批处理时间的整数倍,不然会报错.

package cn.lijie.kafka

import cn.lijie.MyLog
import org.apache.log4j.Level
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
* User: lijie
* Date: 2017/8/8
* Time: 14:04
*/
object SparkWindowDemo {

val myfunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map(x => {
(x._1, x._2.sum + x._3.getOrElse(0))
})
}

def main(args: Array[String]): Unit = {
MyLog.setLogLeavel(Level.WARN)
val conf = new SparkConf().setMaster("local[2]").setAppName("window")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))
sc.setCheckpointDir("C:\\Users\\Administrator\\Desktop\\myck01")
val ds = ssc.socketTextStream("192.168.80.123", 9999)
//Seconds(5)表示窗口的宽度 Seconds(3)表示多久滑动一次(滑动的时间长度)
val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10))
// 窗口长度和滑动的长度一致,那么类似于每次计算自己批量的数据,用updateStateByKey也可以累计计算单词的wordcount 这里只是做个是实验
// val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(4), Seconds(4)).updateStateByKey(myfunc, new HashPartitioner(sc.defaultParallelism), true)
re.print()
ssc.start()
ssc.awaitTermination()
}
}