Scala版SparkStreaming读写kafka,low level api模板代码存档

时间:2022-09-04 20:06:37

    spark streaming从kafka的某个topic拉取数据,处理完后再放入某个topic中的一个模板,不完整,作为参考,加了部分备注。    

    对于kafka的理解

       从黑盒上说可以理解为一个消息队列,也就是生产者(producer)将消息放入kafka,消费者(consumer)从kafka拉数据进行消费处理。而生产者可以放各种各样的消息,把这些消息按类别可以分为不同的topic,而消费者可以选择消费哪些topic。消费者消费的顺序通常而言就是传统的队列方式,也就是先进先出(FIFO)。

      从结构上来说,有一个图比较直观,kafka由producer,broker,consumer以及zookeeper组成。其中的broker我的理解就是用于缓存数据的,一个broker中可能有很多的partition,而一个partition可以理解为一个队列,里面存放着消息。一个topic可以存放在不同的partition中,让消息发送可以并发更有效率,而一个partition中的消息只能属于一个topic。而zookeeper的作用我认为就是来记录offset,某个topic存在哪个broker的哪个partition中,某个partition中存放消息的顺序之类的信息。

Scala版SparkStreaming读写kafka,low level api模板代码存档

   spark streaming我觉得这个如果写个脚本定期的起spark是否可以得到同样的效果?给我的感觉就是定期的起一下,拉一个时间窗口的数据然后进行处理,还得继续学习。

   其实很多地方还是不太理解的,比如要记录offset,保证重启后能从上次失败的地方开始读这种事还得研究。



SparkStreaming主程序模板

package jacob.sparkstreaming


import java.util

import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.JavaConversions._

object SparkStreaming {

def main(args: Array[String]): Unit = {

// val topics = "jacob_topic" //读取kafka的topic
// val group = "jacob_group" //读取kafka的group
// val zkQuorum = "10.10.10.110:2181,10.10.10.111:2181/kafka-0.8.1" //zookeeper地址
// val brokerList = "10.10.10.196:8092,10.10.10.196:8092" // broker的地址

val Array(topics, group, zkQuorum,brokerList) = args
val sparkConf = new SparkConf().setAppName("sparkstreaming_kafka_test")//.setMaster("local[3]")
//配置spark, 打开setMaster("local[3]")为本地运行
SparkUtil.setSparkConfig(sparkConf)
val ssc = new StreamingContext(sparkConf, Seconds(5)) // spark streaming设置为5秒拉一次数据
val broadcastTest = ssc.sparkContext.broadcast(new BroadcastTest)// 可以根据需求设置一个广播的类
//kafka参数
val kafkaParam = Map[String,String]( //kafka低级api配置
"zookeeper.connect" -> zkQuorum, //配置zookeeper
"metadata.broker.list"->brokerList,
"group.id" -> group, //设置一下group id
"auto.offset.reset" -> kafka.api.OffsetRequest.LargestTimeString, //从该topic最新的位置开始读数
"client.id" -> group,
"zookeeper.connection.timeout.ms" -> "10000"
)
val topicSet = topics.split(",").toSet
val directKafka: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParam,topicSet)
val lines = directKafka.map(x => x._2) // kafka取出的数据,_1是其topic,_2是消息
lines.repartition(32).foreachRDD(rdd =>{ //重新划分partition,可以设置更多或者更少的并发,否则根据上游的partition数目分线程数
if(null!=rdd && !rdd.isEmpty()) {
//处理each RDD
processRDD(rdd,broadcastTest)
}else{
println("rdd is null!")
}

} )

ssc.start()
ssc.awaitTermination()
}
/**
* 处理每个rdd
*/
def processRDD(rdd:RDD[String],broadcastTest:Broadcast[BroadcastTest]): Unit ={
try {

rdd.filter(x => x != null).foreachPartition(partitionOfRecords => {
if (null != partitionOfRecords && (partitionOfRecords.nonEmpty)) {
processPartition(partitionOfRecords,broadcastTest)
}
})
}catch {
case e:Exception=>println("error:valid")
}
}

/**
* 处理每个partition
*/
def processPartition(partition:Iterator[String],broadcastTest:Broadcast[BroadcastTest]): Unit ={
val producer = KafkaUtil.getProducer //设置producer,要自己写完整,不展开了
partition.foreach(pair=> {
// process partition; 处理每一个消息的逻辑
//producer.send(..) 把处理完的数据放入kafka中,也是自己实现的
}

}



SparkConf设置举例

package jacob.util

import org.apache.spark.SparkConf

object SparkUtil {
def setSparkConfig(sparkConf: SparkConf): Unit ={

sparkConf.set("spark.akka.frameSize", "2047")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//一些默认的类使用kryo序列化
sparkConf.set("spark.kryoserializer.buffer.max.mb", "2040")
sparkConf.set("spark.files.overwrite","true")
sparkConf.set("spark.hadoop.validateOutputSpecs", "false")
sparkConf.set("spark.eventLog.overwrite", "true")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition","30") //每秒钟最大消费,而kafka拉的数据为topic对应partition的数量乘以设置的数
}
}

集群启动的shell脚本举例

source /etc/profile
WORK_DIR=/opt/develop/jacob
cd ${WORK_DIR}

topic="jacob_topic"
group="jacob_group"
zk="10.10.10.110:2181,10.10.10.111:2181/kafka-0.8.1"
brokerlist="10.10.10.196:8092,10.10.10.196:8092"
#echo $topic,$group,$zk,$brokerlist

SPARK=/usr/bin/spark-submit
jar=$WORK_DIR/sparkstreaming.jar
${SPARK} --queue datacenter \
--class jacob.sparkstreaming.SparkStreaming \
--master yarn-cluster \
--driver-memory 15g \
--executor-memory 15g \
--executor-cores 16 \
--conf "spark.hadoop.mapreduce.input.fileinputformat.split.minsize=1073741824" \
--conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" \
$jar $topic $group $zk $brokerlist