spark-streaming 编程(四)自定义输出foreachRDD

时间:2022-06-03 03:00:23

foreachRDD可以自定义将结果输出到外部系统,比如hbase,mysql,hdfs等。

对于数据库之类的连接,错误的写法是为每一条数据创建一个数据库连接,那样将会导致严重的性能问题。正确的用法是为每一个DStream的分区创建一个连接,这个分区的数据处理完毕后释放。

dstream.foreachRDD { rdd =>
rdd.foreachPartition {
partitionOfRecords =>
// 创建数据库连接池
val connection = ConnectionPool.getConnection()
//插入该分区中的每条数据
partitionOfRecords.foreach(record => connection.send(record))
//将connection返回到连接池
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}

简易的示例代码

package com.lgh.sparkstreaming

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
* Created by Administrator on 2017/8/23.
*/

object ForeachRDD {
def main(args: Array[String]): Unit = {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}

//参数分别为 zk地址,消费者group名,topic名 多个的话,分隔 ,线程数
val Array(zkQuorum, group, topics, numThreads) = args
//setmaster,local是调试模式使用
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

//Map类型存储的是 key: topic名字 values: 读取该topic的消费者的线程数
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

//参数分别为StreamingContext,kafka的zk地址,消费者group,Map类型
val kafkamessage = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
//_._2取出kafka的实际消息流
val lines=kafkamessage.map(_._2)

val words = lines.flatMap(_.split(" "))
val wordCounts: DStream[(String, Long)] = words.map(x => (x, 1L))
.reduceByKey(_ + _)

//以hbase为例,
wordCounts.foreachRDD(
rdd=>{
rdd.foreachPartition(

rddpartition=>{
//创建mysql连接
Class.forName("com.mysql.jdbc.Driver").newInstance
val conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/test", "username", "password")

//使用conn插入数据
rddpartition.foreach(record => {
val prep = conn.prepareStatement("insert INTO t1 (key, value) VALUES (?, ?) ")
prep.setString(1, record._1)
prep.setLong(2,record._2)
prep.executeUpdate
})
// 关闭连接
conn.close()
}

)
}
)

ssc.start()
ssc.awaitTermination()
}
}