15、Spark Streaming源码解读之No Receivers彻底思考

时间:2021-03-09 20:56:25
在前几期文章里 讲了带Receiver的Spark Streaming 应用的相关源码解读, 但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 
1. 更强的控制*度 
2. 语义一致性 
其实No Receivers的方式更符合我们读取数据,操作数据的思路的。因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式。 如果要操作数据来源,肯定要有一个封装器,这个封装器一定是RDD类型。 以直接访问Kafka中的数据为例,看一下源码中直接读写Kafka中数据的例子代码:
    
    
   
   
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
 
StreamingExamples.setStreamingLogLevels()
 
val Array(brokers, topics) = args
 
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
 
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
 
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
 
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}

Spark streaming 会将数据源封装成一个RDD,也就是KafkaRDD:

     
     
    
    
/**
* A batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param messageHandler function for translating each message into the desired type
*/
private[kafka]
class KafkaRDD[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag,
R: ClassTag] private[spark] (
sc: SparkContext,
kafkaParams: Map[String, String],
val offsetRanges: Array[OffsetRange], //该RDD的数据偏移量
leaders: Map[TopicAndPartition, (String, Int)],
messageHandler: MessageAndMetadata[K, V] => R
) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges

可以看到KafkaRDD 混入了HasOffsetRanges,它是一个trait:

    
    
   
   
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}

其中OffsetRange,标识了RDD的数据的主题、分区、开始偏移量和结束偏移量:

    
    
   
   
inal class OffsetRange private(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long) extends Serializable

回到KafkaRDD,看一下KafkaRDD的getPartitions方法:

    
    
   
   
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}

返回KafkaRDDPartition:

    
    
   
   
private[kafka]
class KafkaRDDPartition(
val index: Int,
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long,
val host: String,
val port: Int
) extends Partition {
/** Number of messages this partition refers to */
def count(): Long = untilOffset - fromOffset
}

KafkaRDDPartition清晰的描述了数据的具体位置,每个 KafkaRDDPartition分区的数据交给 KafkaRDD的compute方法计算:

    
    
   
   
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part, context)
}
}

KafkaRDD的compute方法返回了KafkaIterator对象:

    
    
   
   
private class KafkaRDDIterator(
part: KafkaRDDPartition,
context: TaskContext) extends NextIterator[R] {
 
context.addTaskCompletionListener{ context => closeIfNeeded() }
 
log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
 
val kc = new KafkaCluster(kafkaParams)
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[V]]
val consumer = connectLeader
var requestOffset = part.fromOffset
var iter: Iterator[MessageAndOffset] = null
    //..................
}

KafkaIterator中创建了一个KakfkaCluster对象用于与Kafka集群进行交互,获取数据。

回到开头的例子,我们使用  KafkaUtils . createDirectStream 创建了InputDStream:

    
    
   
   
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

看一下createDirectStream源码:

    
    
   
   
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
//创建KakfaCluster对象
val kc = new KafkaCluster(kafkaParams)
//更具kc的信息获取数据偏移量
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}

首先通过KafkaCluster从Kafka集群获取信息,创建DirectKafkaInputDStream对象返回

DirectKafkaInputDStream的compute方法源码:
    
    
   
   
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    //计算最近的数据终止偏移量
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
    //利用数据的偏移量创建KafkaRDD
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
 
// Report the record number and metadata of this batch interval to InputInfoTracker.
val offsetRanges = currentOffsets.map { case (tp, fo) =>
val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
 
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}

可以看到DirectKafkaInputDStream的compute方法中,首先从Kafka集群获取数据的偏移量,然后利用获取偏移量创建RDD,这个Receiver的RDD创建方式不同。

总结:

而且KafkaRDDPartition只能属于一个topic,不能让partition跨多个topic,直接消费一个kafkatopic,topic不断进来、数据不断偏移,Offset代表kafka数据偏移量指针。

数据不断流进kafka,batchDuration假如每十秒都会从配置的topic中消费数据,每次会消费一部分直到消费完,下一个batchDuration会再流进来的数据,又可以从头开始读或上一个数据的基础上读取数据。

思考直接抓取kafka数据和receiver读取数据:

好处一:

直接抓取fakfa数据的好处,没有缓存,不会出现内存溢出等之类的问题。但是如果kafka Receiver的方式读取会存在缓存的问题,需要设置读取的频率和block interval等信息。

好处二:

采用receiver方式的话receiver默认情况需要和worker的executor绑定,不方便做分布式,当然可以配置成分布式,采用direct方式默认情况下数据会存在多个worker上的executor。Kafkardd数据默认都是分布在多个executor上的,天然数据是分布式的存在多个executor,而receiver就不方便计算。

好处三:

数据消费的问题,在实际操作的时候采用receiver的方式有个弊端,消费数据来不及处理即操作数据有deLay多才时,Spark Streaming程序有可能奔溃。但如果是direct方式访问kafka数据不会存在此类情况。因为diect方式直接读取kafka数据,如果delay就不进行下一个batchDuration读取。

好处四:

完全的语义一致性,不会重复消费数据,而且保证数据一定被消费,跟kafka进行交互,只有数据真正执行成功之后才会记录下来。

生产环境下强烈建议采用direct方式读取kafka数据。