注意:
- window length(窗口长度) - 窗口的持续时间(图3)
- sliding interval(滑动时间间隔) - 执行窗口操作的时间间隔(图2)
这两个参数必须是DStream批处理的时间间隔的倍数
示例
- 代码
object WindowApp {
def main(args: Array[String]) {
System.setProperty("hadoop.home.dir", "D:\\hadoop")
// 准备工作
val conf = new SparkConf().setMaster("local[2]").setAppName("WindowApp")
val ssc = new StreamingContext(conf, Seconds(10))
// 业务逻辑处理
val lines = ssc.socketTextStream("hadoop000", 9999) // 1 thread
lines.flatMap(_.split(","))
.map((_,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),
Seconds(10), Seconds(5))
.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
-
如上:val ssc = new StreamingContext(conf, Seconds(5))
这个批处理时间设置为10s会报错 -
原因:是下面设置滑动时间是5s(应该是10s的倍数)
.map((_,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),
Seconds(10), Seconds(5))