dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker }}
dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() }}
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }}这样一个partition,只需连接一次外部存储。性能上有大幅度的提高。但是不同的partition之间不能复用连接。我们可以使用连接池的方式,使得partition之间可以共享连接。代码如下:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }}
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.4</version> </dependency>
1
2
3
4
5
6
7
8
9
10
|
mysql>
create
database
spark;
Query OK, 1 row affected (0.01 sec)
mysql> use spark;
Database
changed
mysql> show tables;
Empty
set
(0.01 sec)
mysql>
create
table
searchKeyWord(insert_time
date
,keyword
varchar
(30),search_count
integer
);
Query OK, 0
rows
affected (0.05 sec)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
package
com.dt.spark.common;
import
java.sql.Connection;
import
java.sql.PreparedStatement;
import
java.sql.ResultSet;
import
org.apache.commons.dbcp.BasicDataSource;
import
org.apache.log4j.Logger;
/**
* Description: 数据库连接池类
* @author dinglq
*/
public
class
ConnectPool {
private
static
Logger log = Logger.getLogger(ConnectPool.
class
);
private
static
BasicDataSource bs =
null
;
/**
* 创建数据源
* @return
*/
public
static
BasicDataSource getDataSource()
throws
Exception{
if
(bs==
null
){
bs =
new
BasicDataSource();
bs.setDriverClassName(
"com.mysql.jdbc.Driver"
);
bs.setUrl(
"jdbc:mysql://spark-master:3306/spark"
);
bs.setUsername(
"root"
);
bs.setPassword(
"vincent"
);
bs.setMaxActive(
200
);
//设置最大并发数
bs.setInitialSize(
30
);
//数据库初始化时,创建的连接个数
bs.setMinIdle(
50
);
//最小空闲连接数
bs.setMaxIdle(
200
);
//数据库最大连接数
bs.setMaxWait(
1000
);
bs.setMinEvictableIdleTimeMillis(
60
*
1000
);
//空闲连接60秒中后释放
bs.setTimeBetweenEvictionRunsMillis(
5
*
60
*
1000
);
//5分钟检测一次是否有死掉的线程
bs.setTestOnBorrow(
true
);
}
return
bs;
}
/**
* 释放数据源
*/
public
static
void
shutDownDataSource()
throws
Exception{
if
(bs!=
null
){
bs.close();
}
}
/**
* 获取数据库连接
* @return
*/
public
static
Connection getConnection(){
Connection con=
null
;
try
{
if
(bs!=
null
){
con=bs.getConnection();
}
else
{
con=getDataSource().getConnection();
}
}
catch
(Exception e) {
log.error(e.getMessage(), e);
}
return
con;
}
/**
* 关闭连接
*/
public
static
void
closeCon(ResultSet rs,PreparedStatement ps,Connection con){
if
(rs!=
null
){
try
{
rs.close();
}
catch
(Exception e) {
log.error(
"关闭结果集ResultSet异常!"
+e.getMessage(), e);
}
}
if
(ps!=
null
){
try
{
ps.close();
}
catch
(Exception e) {
log.error(
"预编译SQL语句对象PreparedStatement关闭异常!"
+e.getMessage(), e);
}
}
if
(con!=
null
){
try
{
con.close();
}
catch
(Exception e) {
log.error(
"关闭连接对象Connection异常!"
+e.getMessage(), e);
}
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
package
com.dt.spark.streaming
import
com.dt.spark.common.ConnectPool
import
org.apache.spark.SparkConf
import
org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 以网站热词排名为例,将处理结果写到MySQL中
* Created by dinglq on 2016/5/3.
*/
object
WriteDataToMySQL {
def
main(args
:
Array[String]) {
val
conf
=
new
SparkConf().setAppName(
"WriteDataToMySQL"
)
val
ssc
=
new
StreamingContext(conf,Seconds(
5
))
// 假设socket输入的数据格式为:searchKeyword,time
val
ItemsStream
=
ssc.socketTextStream(
"spark-master"
,
9999
)
// 将输入数据变成(searchKeyword,1)
var
ItemPairs
=
ItemsStream.map(line
=
>(line.split(
","
)(
0
),
1
))
val
ItemCount
=
ItemPairs.reduceByKeyAndWindow((v
1
:
Int,v
2
:
Int)
=
> v
1
+v
2
,Seconds(
60
),Seconds(
10
))
//ssc.checkpoint("/user/checkpoints/")
// val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,(v1:Int,v2:Int)=> v1-v2,Seconds(60),Seconds(10))
/**
* 接下来需要对热词的频率进行排序,而DStream没有提供sort的方法。那么我们可以实现transform函数,用RDD的sortByKey实现
*/
val
hottestWord
=
ItemCount.transform(itemRDD
=
> {
val
top
3
=
itemRDD.map(pair
=
> (pair.
_
2
, pair.
_
1
))
.sortByKey(
false
).map(pair
=
> (pair.
_
2
, pair.
_
1
)).take(
3
)
ssc.sparkContext.makeRDD(top
3
)
})
hottestWord.foreachRDD(rdd
=
> {
rdd.foreachPartition(partitionOfRecords
=
>{
val
conn
=
ConnectPool.getConnection
conn.setAutoCommit(
false
);
//设为手动提交
val
stmt
=
conn.createStatement();
partitionOfRecords.foreach( record
=
> {
stmt.addBatch(
"insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"
+record.
_
1
+
"','"
+record.
_
2
+
"')"
);
})
stmt.executeBatch();
conn.commit();
//提交事务
})
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
root@spark-master:~
# nc -lk 9999
hadoop,1111
spark,2222
spark,3333
hadoop,1111
spark,2222
spark,3333
hadoop,1111
spark,2222
spark,3333
hadoop,1111
spark,2222
spark,3333
|
1
|
root@spark-master:~
# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.WriteDataToMySQL --jars=mysql-connector-java-5.1.38.jar,commons-dbcp-1.4.jar ./spark.jar
|
1
2
3
4
5
6
7
8
9
10
|
mysql>
select
* from searchKeyWord;
+-------------+---------+--------------+
| insert_time | keyword | search_count |
+-------------+---------+--------------+
| 2016-05-03 | spark | 4 |
| 2016-05-03 | hadoop | 2 |
| 2016-05-03 | spark | 4 |
| 2016-05-03 | hadoop | 2 |
+-------------+---------+--------------+
4 rows
in
set
(0.00 sec)
|