spark发行版笔记10

时间:2023-03-08 16:41:35
spark发行版笔记10

感谢DT大数据梦工厂支持提供技术支持,DT大数据梦工厂专注于Spark发行版定制。

本期概览:

数据接收全生命周期的思考

大数据处理框架中,最重要的就是性能,性能是排在前面的。其次再考虑其他的。因为数据量大,一不小心的多余的操作,几分钟,十几分钟就过去了。

根据一般的架构设计原则,接收数据和存储数据是不同的对象来完成的。

Spark Streaming数据接收全生命周期可以看成是一个MVC模式,ReceiverSupervisor相当于是控制器(c),Receiver(v)

首先启动的是ReceiverTracker。

开启通信并且启动receiver执行线程spark发行版笔记10
Start a receiver along with its scheduled executors

spark发行版笔记10

spark发行版笔记10

Get the receivers from the ReceiverInputDStreams, distributes them to the

* worker nodes as a parallel collection, and runs them.

spark发行版笔记10
要注意的是Receiver是可序列化的,要进行通信

spark发行版笔记10

值得注意的是ReceiverSupervisor与ReceiverTracker的消息通信的主要代码如下

spark发行版笔记10

spark发行版笔记10

/** Divides received data records into data blocks for pushing in BlockManager. */

spark发行版笔记10

这里的调用onStart()方法要先于Receiver的onStart()方法,因为Receiver的onStart()方法要用到BlockGenerator等在这里的调用onStart()初始化的值

spark发行版笔记10

* Note: Do not create BlockGenerator instances directly inside receivers. Use

* `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.

spark发行版笔记10

这里生动的说明了一个BlockGenerator只服务于一个DStream

spark发行版笔记10

Receiver接收数据应该是非阻塞式的,所以应该单独开启一条线程来执行

spark发行版笔记10

默认情况 下,每200毫秒产生一个Block,并且在生产环境中有个最佳实践,那就是性能调优的时候spark.streaming.blockInterval最好不要低于50毫秒,因为一般情况下产生的碎片小文件过多,过多的句柄占据内存或者磁盘空间,造成性能下降,当然,根据具体的不同的数据的流入的速度不同,最优化的设置多少时间的数据合并为一个Block是不同的。要根据具体情况具体分析。原则上是产生的文件大小在速度和句柄数量之间平衡。

spark发行版笔记10

每隔10毫秒就push数据到磁盘(Block)

spark发行版笔记10

发送消息启动所有的receivers

spark发行版笔记10

/**

* Start a receiver along with its scheduled executors 将调度的receiver启动

*/

private def startReceiver(

receiver: Receiver[_],

scheduledLocations: Seq[TaskLocation]): Unit = {

def shouldStartReceiver: Boolean = {

// It's okay to start when trackerState is Initialized or Started

!(isTrackerStopping || isTrackerStopped)

}

val receiverId = receiver.streamId

if (!shouldStartReceiver) {

onReceiverJobFinish(receiverId)

return

}

val checkpointDirOption = Option(ssc.checkpointDir)

val serializableHadoopConf =

new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

// Function to start the receiver on the worker node

val startReceiverFunc: Iterator[Receiver[_]] => Unit =

(iterator: Iterator[Receiver[_]]) => {

if (!iterator.hasNext) {

throw new SparkException(

"Could not start receiver as object not found.")

}

if (TaskContext.get().attemptNumber() == 0) {

val receiver = iterator.next()

assert(iterator.hasNext == false)

val supervisor = new ReceiverSupervisorImpl(

receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)

supervisor.start()

supervisor.awaitTermination()

} else {

// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.

}

}