之前学习过Spark Core源码,接下来一段时间研究一下Spark Streaming相关的内容!下面就从最简单的Streaming程序开始作为入口点(Receiver模式),程序代码如下:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by Daxin on 2017/8/4.
*/
object StreamMain {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("socketStream")
//基于Reciver模式,所以线程数目需要大于1,否则只能接受数据无法处理数据
conf.setMaster("local[*]") //如果设置conf.setMaster("local[1]")的话,将会没有线程负责计算
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(2))
//TODO 最终创建一个SocketInputDStream返回
val line = ssc.socketTextStream("node", 9999)
val result = line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
//TODO 核心代码入口点,在org.apache.spark.streaming.StreamingContext.start方法中启动JobScheduler,开启接受数据并进行计算
ssc.start()
ssc.awaitTermination()
}
}
如上程序可分为三部分,第一部分是模板代码创建ssc,第二部分创建DStream以及算子计算,第三部分启动ssc。下文将主要这三部分
1:StreamingContext的创建
SparkContext是Spark批处理的的入口,此处StreamingContext也正是Spark Streaming的入口。StreamingContext提供创建DStream的函数,以及调用其start方法启动流计算。
主构造器如下:
class StreamingContext private[streaming] (
_sc: SparkContext,
_cp: Checkpoint,
_batchDur: Duration
) extends Logging
辅助构造器:
/**
* Create a StreamingContext using an existing SparkContext.
* @param sparkContext existing SparkContext
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
}
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
* @param conf a org.apache.spark.SparkConf object specifying Spark parameters
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
/**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName a name for your job, to display on the cluster web UI
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(
master: String,
appName: String,
batchDuration: Duration,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map()) = {
this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
null, batchDuration)
}
/**
* Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
* @param hadoopConf Optional, configuration object if necessary for reading from
* HDFS compatible filesystems
*/
def this(path: String, hadoopConf: Configuration) =
this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).orNull, null)
/**
* Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
def this(path: String) = this(path, SparkHadoopUtil.get.conf)
/**
* Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
* @param path Path to the directory that was specified as the checkpoint directory
* @param sparkContext Existing SparkContext
*/
def this(path: String, sparkContext: SparkContext) = {
this(
sparkContext,
CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).orNull,
null)
}
通过构造器发现都需要一个SparkContext对象,可以更加清楚认识到Spark Streaming是在Spark Core基础之上构建的流计算引擎。
2:DStream的创建
StreamingContext类似于SparkContext同样提供了一些创建抽象数据集的函数,例如:
/**
* Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @see [[socketStream]]
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
本小节主要分析一下基于Receiver方式的创建DStream过程,主要是以Socket方式创建DStream为主要流程进行分析。接下来看一下DStream的继承结构:
注意:启动关于Flume相关的DStream是来至于Spark Streaming的Flume依赖库中的类,需要自行引入依赖!
通过上图我们可以看到基于Recevier方式创建的DStream都是继承至ReceiverInputDStream(例如:Direct方式的Kafka DStream是继承至InputDStream)。
接下来看一下如下代码创建DStream过程:
val line = ssc.socketTextStream("node", 9999)
/** * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @see [[socketStream]] */ def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } /** * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes it interpreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param converter Function to convert the byte stream to objects * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) }
private[streaming]
class SocketInputDStream[T: ClassTag](
_ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)//后期org.apache.spark.streaming.scheduler.ReceiverTracker#launchReceivers中进行调用getReceiver()方法}}再看一下SocketReceiver的源码:
private[streaming]
class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
private var socket: Socket = _
def onStart() {
logInfo(s"Connecting to $host:$port")
try {
socket = new Socket(host, port)
} catch {
case e: ConnectException =>
restart(s"Error connecting to $host:$port", e)
return
}
logInfo(s"Connected to $host:$port")
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }//调用进行数据数据接受
}.start()
}
def onStop() {
// in case restart thread close it twice
synchronized {
if (socket != null) {
socket.close()
socket = null
logInfo(s"Closed socket to $host:$port")
}
}
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
try {
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next())//重点部分
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
onStop()
}
}
}
到此处就完成了socketTextStream的功能。到此处由于org.apache.spark.streaming.dstream.SocketInputDStream#getReceiver方法还没有被调用,所以还没有创建SocketReceiver,因此此处ssc.socketTextStream返回的只是一个数据的表示,并不代表真实的数据,只有当ssc.start()调用之后才会真正进行数据接受和处理!
3:StreamingContext的启动(StreamingContext.start())
在分析start过程之前先简述一下几个相关组件的功能,组件分别如下:a: JobScheduler调度流作业在spark上运行
b:使用JobGenerator产生spark job并在线程池中运行
StreamingContext.start()源码如下:
/**
* Start the execution of the streams.
*
* @throws IllegalStateException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
//TODO JobScheduler
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
logDebug("Adding shutdown hook") // force eager creation of logger
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}如下是 JobScheduler.start()代码:
def start(): Unit = synchronized {这里面核心代码是:ReceiverTracker和JobGenerator的创建初始化以及启动!
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start()
//TODO ReceiverTracker初始化
receiverTracker = new ReceiverTracker(ssc)
//TODO 主要用来统计输入数据集的统计信息,用来进行UI显示和监控
inputInfoTracker = new InputInfoTracker(ssc)
val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
case _ => null
}
executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
clock)
executorAllocationManager.foreach(ssc.addStreamingListener)
//TODO 重点:ReceiverTracker启动,负责数据接受和处理
receiverTracker.start()
//TODO 重点:jobGenerator启动,负责产生Spark Job
jobGenerator.start()
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
}ReceiverTracker的启动(如下代码运行在Driver):
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {//在RpcEnv环境下(其实就是NettyRpcEnv)创建ReceiverTracker的endPoint// endpoint = ssc.env.rpcEnv.setupEndpoint("ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //TODO 启动Receiver if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } }launchReceivers()方法实现:
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
* <br><br>
* org.apache.spark.streaming.scheduler.ReceiverTracker#launchReceivers()
*/
private def launchReceivers(): Unit = {
//TODO 获取ReceiverInputDStream中的Receiver
val receivers = receiverInputStreams.map { nis =>
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
}
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
//TODO endpoint为ReceiverTracker的Endpoint引用
//TODO 就是给自己发送消息
endpoint.send(StartAllReceivers(receivers))
}org.apache.spark.streaming.scheduler.ReceiverTracker.ReceiverTrackerEndpoint#receive对StartAllReceivers消息的处理:
override def receive: PartialFunction[Any, Unit] = {
// Local messages
//TODO 处理消息
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
//TODO 启动receiver
startReceiver(receiver, executors)
}
case RestartReceiver(receiver) =>
// Old scheduled executors minus the ones that are not active any more
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
// Try global scheduling again
oldScheduledExecutors
} else {
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
// Clear "scheduledLocations" to indicate we are going to do local scheduling
val newReceiverInfo = oldReceiverInfo.copy(
state = ReceiverState.INACTIVE, scheduledLocations = None)
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
}
// Assume there is one receiver restarting at one time, so we don't need to update
// receiverTrackingInfos
startReceiver(receiver, scheduledLocations)
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
case UpdateReceiverRateLimit(streamUID, newRate) =>
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}
// Remote messages
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
}
最后完成ReceiverSupervisorImpl的创建:/**
* Start a receiver along with its scheduled executors
*/
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
//TODO 完成ReceiverSupervisorImpl的创建
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(ThreadUtils.sameThread)
logInfo(s"Receiver ${receiver.streamId} started")
}