通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统中

时间:2023-01-15 20:46:16

文章显示好像有点问题,原文在http://lqding.blog.51cto.com/9123978/1769814

SparkStreaming的DStream提供了一个dstream.foreachRDD方法,该方法是一个功能强大的原始的API,它允许将数据发送到外部系统。然而,重要的是要了解如何正确有效地使用这种原始方法。一些常见的错误,以避免如下:

写数据到外部系统,需要建立一个数据连接对象(例如TCP连接到远程的服务器),使用它将数据发送到外部存储系统。为此开发者可能会在Driver中尝试创建一个连接,然后在worker中使用它来保存记录到外部数据。例如如下scala代码:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }}

上面的代码是一个错误的演示,因为连接是在Driver中创建的,而写数据是在worker中完成的。此时连接就需要被序列化然后发送到worker中。但是我们知道,连接的信息是不能被序列化和发序列化的(不同的机器连接服务器需要使用不同的服务器端口,即便连接被序列化了也不能使用)

进而我们可以将连接移动到worker中实现,代码如下:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }}

但是此时,每处理一条数据记录,就需要连接一次外部系统,对于性能来说是个严重的问题。这也不是一个完美的实现

我们可以将代码做如下的改进:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }}
这样一个partition,只需连接一次外部存储。性能上有大幅度的提高。但是不同的partition之间不能复用连接。我们可以使用连接池的方式,使得partition之间可以共享连接。代码如下:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }}

下面我们使用SparkStreaming实现将数据写到MySQL中:

在pom.xml中加入如下依赖包

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.38</version>
</dependency>
<dependency>
    <groupId>commons-dbcp</groupId>
    <artifactId>commons-dbcp</artifactId>
    <version>1.4</version>
</dependency>

在MySql中创建数据库和表

1
2
3
4
5
6
7
8
9
10
mysql>  create  database  spark;
Query OK, 1 row affected (0.01 sec)
 
mysql> use spark;
Database  changed
mysql> show tables;
Empty  set  (0.01 sec)
 
mysql>  create  table  searchKeyWord(insert_time  date ,keyword  varchar (30),search_count  integer );
Query OK, 0  rows  affected (0.05 sec)

使用Java编写一个数据库连接池类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package  com.dt.spark.common;
 
import  java.sql.Connection;
import  java.sql.PreparedStatement;
import  java.sql.ResultSet;
 
import  org.apache.commons.dbcp.BasicDataSource;
import  org.apache.log4j.Logger;
 
/**
  * Description: 数据库连接池类
  * @author dinglq
  */
public  class  ConnectPool {
     private  static  Logger log = Logger.getLogger(ConnectPool. class );
     private  static  BasicDataSource bs =  null ;
 
     /**
      * 创建数据源
      * @return
      */
     public  static  BasicDataSource getDataSource()  throws  Exception{
         if (bs== null ){
             bs =  new  BasicDataSource();
             bs.setDriverClassName( "com.mysql.jdbc.Driver" );
             bs.setUrl( "jdbc:mysql://spark-master:3306/spark" );
             bs.setUsername( "root" );
             bs.setPassword( "vincent" );
             bs.setMaxActive( 200 ); //设置最大并发数
             bs.setInitialSize( 30 ); //数据库初始化时,创建的连接个数
             bs.setMinIdle( 50 ); //最小空闲连接数
             bs.setMaxIdle( 200 ); //数据库最大连接数
             bs.setMaxWait( 1000 );
             bs.setMinEvictableIdleTimeMillis( 60 * 1000 ); //空闲连接60秒中后释放
             bs.setTimeBetweenEvictionRunsMillis( 5 * 60 * 1000 ); //5分钟检测一次是否有死掉的线程
             bs.setTestOnBorrow( true );
         }
         return  bs;
     }
 
     /**
      * 释放数据源
      */
     public  static  void  shutDownDataSource()  throws  Exception{
         if (bs!= null ){
             bs.close();
         }
     }
 
     /**
      * 获取数据库连接
      * @return
      */
     public  static  Connection getConnection(){
         Connection con= null ;
         try  {
             if (bs!= null ){
                 con=bs.getConnection();
             } else {
                 con=getDataSource().getConnection();
             }
         catch  (Exception e) {
             log.error(e.getMessage(), e);
         }
         return  con;
     }
 
     /**
      * 关闭连接
      */
     public  static  void  closeCon(ResultSet rs,PreparedStatement ps,Connection con){
         if (rs!= null ){
             try  {
                 rs.close();
             catch  (Exception e) {
                 log.error( "关闭结果集ResultSet异常!" +e.getMessage(), e);
             }
         }
         if (ps!= null ){
             try  {
                 ps.close();
             catch  (Exception e) {
                 log.error( "预编译SQL语句对象PreparedStatement关闭异常!" +e.getMessage(), e);
             }
         }
         if (con!= null ){
             try  {
                 con.close();
             catch  (Exception e) {
                 log.error( "关闭连接对象Connection异常!" +e.getMessage(), e);
             }
         }
     }
}

编写Spark代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package  com.dt.spark.streaming
 
import  com.dt.spark.common.ConnectPool
import  org.apache.spark.SparkConf
import  org.apache.spark.streaming.{Seconds, StreamingContext}
 
 
/**
  * 以网站热词排名为例,将处理结果写到MySQL中
  * Created by dinglq on 2016/5/3.
  */
object  WriteDataToMySQL {
   def  main(args :  Array[String]) {
     val  conf  =  new  SparkConf().setAppName( "WriteDataToMySQL" )
     val  ssc  =  new  StreamingContext(conf,Seconds( 5 ))
     // 假设socket输入的数据格式为:searchKeyword,time
     val  ItemsStream  =  ssc.socketTextStream( "spark-master" , 9999 )
     // 将输入数据变成(searchKeyword,1)
     var  ItemPairs  =  ItemsStream.map(line  = >(line.split( "," )( 0 ), 1 ))
 
      val  ItemCount  =  ItemPairs.reduceByKeyAndWindow((v 1 : Int,v 2 : Int) = > v 1 +v 2 ,Seconds( 60 ),Seconds( 10 ))
     //ssc.checkpoint("/user/checkpoints/")
     // val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,(v1:Int,v2:Int)=> v1-v2,Seconds(60),Seconds(10))
     /**
      * 接下来需要对热词的频率进行排序,而DStream没有提供sort的方法。那么我们可以实现transform函数,用RDD的sortByKey实现
      */
     val  hottestWord  =  ItemCount.transform(itemRDD  = > {
       val  top 3  =  itemRDD.map(pair  = > (pair. _ 2 , pair. _ 1 ))
         .sortByKey( false ).map(pair  = > (pair. _ 2 , pair. _ 1 )).take( 3 )
       ssc.sparkContext.makeRDD(top 3 )
     })
 
     hottestWord.foreachRDD(rdd  = > {
       rdd.foreachPartition(partitionOfRecords  = >{
         val  conn  =  ConnectPool.getConnection
         conn.setAutoCommit( false );   //设为手动提交
         val   stmt  =  conn.createStatement();
 
         partitionOfRecords.foreach( record  = > {
 
           stmt.addBatch( "insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'" +record. _ 1 + "','" +record. _ 2 + "')" );
         })
 
         stmt.executeBatch();
         conn.commit();   //提交事务
 
       })
     })
 
     ssc.start()
     ssc.awaitTermination()
     ssc.stop()
 
   }
}


打开netcat发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
root@spark-master:~ # nc -lk 9999
hadoop,1111
spark,2222
spark,3333
hadoop,1111
spark,2222
spark,3333
hadoop,1111
spark,2222
spark,3333
hadoop,1111
spark,2222
spark,3333

运行spark代码

1
root@spark-master:~ # /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.WriteDataToMySQL  --jars=mysql-connector-java-5.1.38.jar,commons-dbcp-1.4.jar ./spark.jar

查看数据库中的结果:

1
2
3
4
5
6
7
8
9
10
mysql>  select  * from searchKeyWord;
+-------------+---------+--------------+
| insert_time | keyword | search_count |
+-------------+---------+--------------+
| 2016-05-03  | spark   |            4 |
| 2016-05-03  | hadoop  |            2 |
| 2016-05-03  | spark   |            4 |
| 2016-05-03  | hadoop  |            2 |
+-------------+---------+--------------+
4 rows  in  set  (0.00 sec)