sparkstreaming整合kafka又两种方式(kafka0.8有支持两种方式,kafka0.10仅支持直连方式,舍弃了receive方式)
1,有receive方式。相当有接收者接收一个批次产生的数据。然后再进行计算。缺点:使用高级的消费API连接ZK,自动更新偏移量,但是效率低。
2,直连方式。相当与Task直接连到kafka对应topic分区上,以迭代器的方式一条一条读取数据。边读边计算,计算一个批次的时间,生成一个批次的小结果。该消费方式,不需要连接ZK,直接连接到broker上,但是需要自己维护偏移量。偏移量可以记录到Mysql,Redis,ZK中。下面的代码就是将记录的偏移量保存到ZK中。
消费端不依赖zookeeper来维护偏移量,需要自己写代码记录偏移量,这种方式优缺点:优点:不会造成数据丢失,因为直接连接kafka的分区。缺点代码复杂。
package cn.edu360.streaming
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
//kafka0.8版本
object KafkaDirectWordCount {
def main(args: Array[String]): Unit = {
//指定组名
val group = "g001"
//创建SparkConf
val conf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]")
//创建SparkStreaming,并设置间隔时间
val ssc = new StreamingContext(conf, Duration(5000))
//指定消费的 topic 名字
val topic = "wordcount"
//指定kafka的broker地址(sparkStream的Task直连到kafka的分区上,用更加底层的API消费,效率更高)
val brokerList = "node-1.xiaoniu.com:9092,node-2.xiaoniu.com:9092,node-3.xiaoniu.com:9092"
//指定zk的地址,后期更新消费的偏移量时使用
val zkQuorum = "node-1.xiaoniu.com:2181,node-2.xiaoniu.com:2181,node-3.xiaoniu.com:2181"
//创建 stream 时使用的 topic 名字集合
val topics: Set[String] = Set(topic)
//创建一个 ZKGroupTopicDirs 对象,其实是指定往zk中写入数据的目录,用于保存偏移量
val topicDirs = new ZKGroupTopicDirs(group, topic)
//获取 zookeeper 中的路径 "/g001/offsets/wordcount/"
val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
//准备kafka的参数
val kafkaParams = Map(
"metadata.broker.list" -> brokerList,
"group.id" -> group,
"auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
)
//zookeeper 的host 和 ip,创建一个 client,用于跟新偏移量量的
val zkClient = new ZkClient(zkQuorum)
//查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
// /g001/offsets/wordcount/0/10001"
// /g001/offsets/wordcount/1/30001"
// /g001/offsets/wordcount/2/10001"
//zkTopicPath -> /g001/offsets/wordcount/
val children = zkClient.countChildren(zkTopicPath)
var kafkaStream: InputDStream[(String, String)] = null
//如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
var fromOffsets: Map[TopicAndPartition, Long] = Map()
//如果保存过 offset
if (children > 0) {
for (i <- 0 until children) {
// /g001/offsets/wordcount/0/10001
val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}")
// wordcount/0
val tp = TopicAndPartition(topic, i)
//将不同 partition 对应的 offset 增加到 fromOffsets 中
// wordcount/0 -> 10001
fromOffsets += (tp -> partitionOffset.toLong)
}
//Key: wordcount values: "hello tom hello jerry"
//这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
//通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据)
//[String, String, StringDecoder, StringDecoder, (String, String)]
// key value key的解码方式 value的解码方式
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
} else {
//如果未保存,根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offset
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}
//偏移量的范围
var offsetRanges = Array[OffsetRange]()
//从kafka读取的消息,DStream的Transform方法可以将当前批次的RDD获取出来
//该transform方法计算获取到当前批次RDD的偏移量
val transform: DStream[(String, String)] = kafkaStream.transform { rdd =>
//得到该 rdd 对应 kafka 的消息的 offset
//该RDD是一个KafkaRDD,可以获得偏移量的范围
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
val messages: DStream[String] = transform.map(_._2)
//依次迭代DStream中的RDD
messages.foreachRDD { rdd =>
//对RDD进行操作,触发Action
rdd.foreachPartition(partition =>
partition.foreach(x => {
println(x)
})
)
for (o <- offsetRanges) {
val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
//将该 partition 的 offset 保存到 zookeeper
ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)
}
}
ssc.start()
ssc.awaitTermination()
}
}
kafka0.10版本
package cn.edu360.streaming.kafka10
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by zx on 2017/7/6.
*/
object DirectStream {
def main(args: Array[String]): Unit = {
//创建SparkConf,如果将任务提交到集群中,那么要去掉.setMaster("local[2]")
val conf = new SparkConf().setAppName("DirectStream").setMaster("local[2]")
//创建一个StreamingContext,其里面包含了一个SparkContext
val streamingContext = new StreamingContext(conf, Seconds(5))
//配置kafka的参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node-1.xiaoniu.com:9092,node-2.xiaoniu.com:9092,node-3.xiaoniu.com:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test123",
"auto.offset.reset" -> "earliest", // lastest
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("xiaoniu")
//在Kafka中记录读取偏移量
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
//位置策略(可用的Executor上均匀分配分区)
LocationStrategies.PreferConsistent,
//消费策略(订阅固定的主题集合)
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//迭代DStream中的RDD(KafkaRDD),将每一个时间点对于的RDD拿出来
stream.foreachRDD { rdd =>
//获取该RDD对于的偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//拿出对应的数据
rdd.foreach{ line =>
println(line.key() + " " + line.value())
}
//异步更新偏移量到kafka中
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
streamingContext.start()
streamingContext.awaitTermination()
}
}
以下两种方式在实际生产环境中不使用。利用zookeeper来维护偏移量,不用自己再去维护,但是这种方式运行速度慢,这种方式不是直接从kafka的分区中获取数据,而是相当于拿一个容器,每隔一段时间取出,当写入大数据量时内存无法承受,容易造成内存溢出,数据丢失。
package cn.edu360.streaming
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StateFulKafkaWordCount {
/**
* 迭代器元组中的三个参数:
* 第一个参数:代表分组的KEY
* 第二个参数:当前批次Key对应的Value,由于有多个Value,那么会将这个批次的Value放到一个SEQ中
* 第三个参数:代表初始值或累加的中间结果
*/
val updateFunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
//it.map(tp => (tp._1, tp._2.sum + tp._3.getOrElse(0)))
it.map{ case (x, y, z) => (x, y.sum + z.getOrElse(0))}
}
def main(args: Array[String]): Unit = {
val sc = new SparkConf().setAppName("StateFulKafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sc, Seconds(5))
//如果要累加历史结果,一定要指定CheckPiont
ssc.checkpoint("./ck")
//传统低效的API,需要连ZK
//通过kafkaUtils创建kafkaStream
val zkQuorum = "node-1.xiaoniu.com:2181,node-2.xiaoniu.com:2181,node-3.xiaoniu.com:2181"
val groupId = "g1"
//要消费的topic的名字,消费者的线程数量
val topic = Map[String, Int]("wordcount" -> 1)
//从kafka中拉取数据
//通过kafka创建DStream
val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)
//获取kafka的value即为真正存储的数据
val lines: DStream[String] = data.map(_._2)
val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
//触发Action
result.print()
//启动
ssc.start()
//等待优雅退出
ssc.awaitTermination()
}
}
方式二
package cn.edu360.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaWordCount {
def main(args: Array[String]): Unit = {
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sc, Seconds(5))
//传统低效的API,需要连ZK
//通过kafkaUtils创建kafkaStream
val zkQuorum = "node-1.xiaoniu.com:2181,node-2.xiaoniu.com:2181,node-3.xiaoniu.com:2181"
val groupId = "g1"
//要消费的topic的名字,消费者的线程数量
val topic = Map[String, Int]("wordcount" -> 1)
//从kafka中拉取数据
//通过kafka创建DStream
val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)
//获取kafka的value即为真正存储的数据
val lines: DStream[String] = data.map(_._2)
val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
//触发Action
result.print()
//启动
ssc.start()
//等待优雅退出
ssc.awaitTermination()
}
}