foreachRDD可以自定义将结果输出到外部系统,比如hbase,mysql,hdfs等。
对于数据库之类的连接,错误的写法是为每一条数据创建一个数据库连接,那样将会导致严重的性能问题。正确的用法是为每一个DStream的分区创建一个连接,这个分区的数据处理完毕后释放。
dstream.foreachRDD { rdd =>
rdd.foreachPartition {
partitionOfRecords =>
// 创建数据库连接池
val connection = ConnectionPool.getConnection()
//插入该分区中的每条数据
partitionOfRecords.foreach(record => connection.send(record))
//将connection返回到连接池
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
简易的示例代码
package com.lgh.sparkstreaming
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
/**
* Created by Administrator on 2017/8/23.
*/
object ForeachRDD {
def main(args: Array[String]): Unit = {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
//参数分别为 zk地址,消费者group名,topic名 多个的话,分隔 ,线程数
val Array(zkQuorum, group, topics, numThreads) = args
//setmaster,local是调试模式使用
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
//Map类型存储的是 key: topic名字 values: 读取该topic的消费者的线程数
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//参数分别为StreamingContext,kafka的zk地址,消费者group,Map类型
val kafkamessage = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
//_._2取出kafka的实际消息流
val lines=kafkamessage.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts: DStream[(String, Long)] = words.map(x => (x, 1L))
.reduceByKey(_ + _)
//以hbase为例,
wordCounts.foreachRDD(
rdd=>{
rdd.foreachPartition(
rddpartition=>{
//创建mysql连接
Class.forName("com.mysql.jdbc.Driver").newInstance
val conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/test", "username", "password")
//使用conn插入数据
rddpartition.foreach(record => {
val prep = conn.prepareStatement("insert INTO t1 (key, value) VALUES (?, ?) ")
prep.setString(1, record._1)
prep.setLong(2,record._2)
prep.executeUpdate
})
// 关闭连接
conn.close()
}
)
}
)
ssc.start()
ssc.awaitTermination()
}
}