博客地址: http://blog.csdn.net/yueqian_zhu/
首先看一个最简单的例子,了解大致的样子:
object NetworkWordCount {本小节主要介绍StreamingContext的构造
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
)
一、API:
1、cp_为null
def this(sparkContext: SparkContext, batchDuration: Duration)2、方法内部也是通过conf自动创建一个sparkContext,cp_为null
def this(conf: SparkConf, batchDuration: Duration)3、conf由默认的和参数部分组合而成,cp_为nulldef this( master: String, appName: String, batchDuration: Duration, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())4、从path目录下读取checkpoint的信息来重建streamingContext,也就不需要sparkContext和Duration参数def this(path: String, hadoopConf: Configuration)def this(path: String)//hadoopConf使用默认的hadoop配置文件自动构造5、使用存在的sparkContext和checkpoint路径来构造def this(path: String, sparkContext: SparkContext)6、需要注意的是,streamingContext对象内部有一个getOrCreate方法,指明如果在checkpointPath路径下读取不到,则调用creatingFunc创建新的streamingContextdef getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = new Configuration(), createOnError: Boolean = false ): StreamingContext二、StreamingContext主要的构造逻辑(checkpoint暂不讨论)1、构造一个graph: DStreamGraph
作用于DStream上的operation分成两类 1. Transformation,2. Output 表示将输出结果。DStreamGraph 有输入就要有输出,如果没有输出,则前面所做的所有动作全部没有意义,那么如何将这些输入和输出绑定起来呢?这个问题的解决就依赖于DStreamGraph,DStreamGraph记录输入的Stream和输出的Stream。
2、构造一个JobScheduler
JobScheduler内部会构造一个jobGenerator,它用于按我们设定的批处理间隔产生job3、状态设置为INITIALIZED下一节介绍上面例子中的operation部分