receive和no receiver的方式(derict的方式)
封装器一定是RDD类型的KafkaRDD,是为不同的数据来源推出不同的RDD
foreachRDD中就可以获得当前batch duration中产生的RDD的分区的数据,RDD所访问的所有分驱的数据。
建议企业级采用no receivers方式开发Spark Streaming应用程序,好处:
1、更优秀的*度控制
2、语义一致性
no receivers更符合数据读取和数据操作,Spark 计算框架底层有数据来源,如果只有direct直接操作数据来源则更天然。操作数据来源封装其一定是rdd级别的。
所以Spark 推出了自定义的rdd即Kafkardd,只是数据来源不同。
KafkaRDD.scala
private[kafka]
class KafkaRDD[
K:ClassTag,
V:ClassTag,
U <:Decoder[_]: ClassTag,
T <:Decoder[_]: ClassTag,
R:ClassTag] private[spark] (
sc: SparkContext,
kafkaParams: Map[String,String],
val offsetRanges:Array[OffsetRange],
leaders: Map[TopicAndPartition,(String, Int)],
messageHandler: MessageAndMetadata[K, V] => R
) extendsRDD[R](sc,Nil) withLogging with HasOffsetRanges{
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host,port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i,o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
final class OffsetRangeprivate(
val topic:String,
val partition:Int,
val fromOffset:Long,
val untilOffset:Long) extends Serializable {
import OffsetRange.OffsetRangeTuple
/** Kafka TopicAndPartition object, for convenience */
def topicAndPartition():TopicAndPartition =TopicAndPartition(topic, partition)
/** Number of messages this OffsetRange refers to */
def count():Long = untilOffset - fromOffset
override def equals(obj: Any): Boolean = objmatch {
case that:OffsetRange =>
this.topic== that.topic &&
this.partition== that.partition &&
this.fromOffset== that.fromOffset &&
this.untilOffset== that.untilOffset
case _=> false
}
override def getPreferredLocations(thePart: Partition):Seq[String] = {
val part =thePart.asInstanceOf[KafkaRDDPartition]
// TODO is additional hostname resolution necessary here
Seq(part.host)
}
override def compute(thePart: Partition, context:TaskContext):Iterator[R] = {
val part =thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <=part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset== part.untilOffset) {
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset "+
s"skipping ${part.topic}${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part, context)
}
}
private class KafkaRDDIterator(
//kafka真正获取数据本身
part: KafkaRDDPartition,
context: TaskContext) extends NextIterator[R] {
context.addTaskCompletionListener{context => closeIfNeeded() }
log.info(s"Computing topic ${part.topic}, partition${part.partition}"+
s"offsets ${part.fromOffset} ->${part.untilOffset}")
val kc= new KafkaCluster(kafkaParams)
def connect(host:String,port: Int): SimpleConsumer =
new SimpleConsumer(host,port, config.socketTimeoutMs,
config.socketReceiveBufferBytes, config.clientId)
override def getNext():R = {
if (iter== null || !iter.hasNext){
iter =fetchBatch
}
if (!iter.hasNext) {
assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
finished = true
null.asInstanceOf[R]
} else {
val item =iter.next()
if (item.offset>= part.untilOffset) {
assert(item.offset ==part.untilOffset, errOvershotEnd(item.offset, part))
finished = true
null.asInstanceOf[R]
} else {
requestOffset = item.nextOffset
messageHandler(new MessageAndMetadata(
part.topic, part.partition,item.message, item.offset, keyDecoder,valueDecoder))
}
}
}
KafkaUtils.scala
创建kafka API的时候一般都是通过KafkaUtils创建的
def createDirectStream[
K:ClassTag,
V:ClassTag,
KD <:Decoder[K]: ClassTag,
VD <:Decoder[V]: ClassTag,
R:ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String,String],
fromOffsets: Map[TopicAndPartition,Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
val cleanedHandler= ssc.sc.clean(messageHandler)
new DirectKafkaInputDStream[K,V, KD, VD, R](
ssc, kafkaParams, fromOffsets,cleanedHandler)
}
private[streaming]
class DirectKafkaInputDStream[
K:ClassTag,
V:ClassTag,
U <:Decoder[K]: ClassTag,
T <:Decoder[V]: ClassTag,
R:ClassTag](
ssc_ : StreamingContext,
val kafkaParams:Map[String,String],
val fromOffsets:Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
) extendsInputDStream[R](ssc_)withLogging {
val maxRetries= context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries",1) //重试一次
//读取速度
override protected[streaming]val rateController: Option[RateController] = {
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new DirectKafkaRateController(id,
RateEstimator.create(ssc.conf, context.graph.batchDuration)))
} else {
None
}
}
override def compute(validTime: Time): Option[KafkaRDD[K,V, U, T, R]] = {
val untilOffsets= clamp(latestLeaderOffsets(maxRetries))
val rdd = KafkaRDD[K,V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
// Report the record number and metadata of this batchinterval to InputInfoTracker.
val offsetRanges= currentOffsets.map { case(tp,fo) =>
val uo =untilOffsets(tp)
OffsetRange(tp.topic,tp.partition, fo, uo.offset)
}
val description= offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition:${offsetRange.partition}\t"+
s"offsets: ${offsetRange.fromOffset} to${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent frombeing modified by the user
val metadata= Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo= StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}
private[kafka] def getFromOffsets(
kc: KafkaCluster,
kafkaParams: Map[String,String],
topics: Set[String]
): Map[TopicAndPartition,Long] = {
val reset= kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
val result= for{
topicPartitions <-kc.getPartitions(topics).right
leaderOffsets <- (if (reset ==Some("smallest")) {
kc.getEarliestLeaderOffsets(topicPartitions)
} else {
kc.getLatestLeaderOffsets(topicPartitions)
}).right
} yield {
leaderOffsets.map { case (tp, lo) =>
(tp, lo.offset)
}
}
KafkaCluster.checkErrors(result)
}
KafkaRDDPartition
//相当于kafka数据来源的指针
private[kafka]
class KafkaRDDPartition(
val index:Int,
val topic:String,
val partition:Int,
val fromOffset:Long,
val untilOffset:Long,
val host: String,
val port:Int
) extends Partition {
/** Number of messages this partition refers to */
def count():Long = untilOffset - fromOffset
}
思考直接抓取kafka数据和receiver读取数据:
好处1. derict的方式没有缓存,不存在内存溢出的方式
好处2. receiver是和具体的excecuter,worker绑定。Receiver的方式不方便做分布式。默认kafkaRDD数据都是分布在多个excecuter上的
好处3.数据消费的问题,在实际操作的时候采用receiver的方式有个弊端,消费数据来不及处理即操作数据有deLay多才时,Spark Streaming程序有可能奔溃。但如果是direct方式访问kafka数据不会存在此类情况。因为diect方式直接读取kafka数据,如果delay就不进行下一个batchDuration读取。
好处4.完全的语义一致性,不会重复消费数据,而且保证数据一定被消费,跟kafka进行交互,只有数据真正执行成功之后才会记录下来。
生产环境下强烈建议采用direct方式读取kafka数据。
backpressure参数可以试探流进的速度和处理能力是否一致。