spark-streaming-[7]-Output Operations on DStreams-foreachRDD写Mysql

时间:2021-11-25 00:49:33

参考legotime SparkStreaming之foreachRDD

一、Output Operations on DStreams-foreachRDD概述

foreachRDD(func) The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

DStream中的foreachRDD是一个非常强大函数,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。所以要掌握它,对它要有深入了解。下面有一些常用的错误需要理解。经常写数据到外部系统需要创建一个连接的object(eg:根据TCP协议连接到远程的服务器,我们连接外部数据库需要自己的句柄)和发送数据到远程的系统为此,开发者需要在Spark的driver创建一个object用于连接。开发人员可能不经意的在Spark驱动中创建一个连接对象,但是在Spark worker中 尝试调用这个连接对象保存记录到RDD中,如下

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能 传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该 在worker中初始化)等 等。正确的解决办法是在worker中创建连接对象。 然而,这会造成另外一个常见的错误-为每一个记录创建了一个连接对象。例如:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显 的减少系统的整体吞吐量。一个更好的解决办法是利用rdd.foreachPartition方法。 为RDD的partition创建一个连接对 象,用这个两件对象发送partition中的所有记录。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

最后,可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

注意:

(1)输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者 用于输出操作 dstream.foreachRDD(),但是没有任何RDD action操作在dstream.foreachRDD()里面,那么什么也不会执行。系统 仅仅会接收输入,然后丢弃它们。

(2)默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。

二、测试

模拟器StreamingSimulation每秒钟发送一行数据,Streaming2Mysql通过socketTextStream(数据片长度1秒正好统计一行的单词量)实时计算统计结果,将结果写入Mysql

  • 步骤1:先运行以下模拟器StreamingSimulation再运行Streaming2Mysql
  • (参数:program arguments:./srcFile/NetworkWordCountData.txt    9999   1000

NetworkWordCountData.txt中内容如下
 
 
hello hello
 
 
hello hello
 
 
hello hello
 
 
hello hello
 
 
hello hello
 
 
hello hello
 
 
hello hello
StreamingSimulation输出:
Got client connect from: /127.0.0.1
-------------------------------------------
Time: 1493880885133 ms
-------------------------------------------
hello hello
-------------------------------------------
Time: 1493880886137 ms
-------------------------------------------
hello hello
-------------------------------------------
Time: 1493880887140 ms
-------------------------------------------
hello hello
-------------------------------------------
Time: 1493880888142 ms
-------------------------------------------
hello hello
-------------------------------------------
Time: 1493880889148 ms
-------------------------------------------
hello hello
-------------------------------------------
Time: 1493880890152 ms 
  • 步骤2:运行Streaming2Mysql实时统计并写入Mysql
  • Mysql建表语句
    CREATE TABLE `streaming_keywordcnt_test` (
      `insert_time` varchar(50) DEFAULT NULL,
      `keyword` varchar(30) DEFAULT NULL,
      `count` int(11) DEFAULT NULL
    )
  • 结果
mysql> select * from streaming_keywordcnt_test;
+---------------+---------+-------+
| insert_time   | keyword | count |
+---------------+---------+-------+
| 1493880885373 | hello   |     2 |
| 1493880886019 | hello   |     2 |
| 1493880887016 | hello   |     2 |
| 1493880888020 | hello   |     2 |
| 1493880889016 | hello   |     2 |
| 1493880890025 | hello   |     2 |
| 1493880891019 | hello   |     2 |
| 1493880892016 | hello   |     2 |
| 1493880893018 | hello   |     2 |
| 1493880894018 | hello   |     2 |
| 1493880895017 | hello   |     2 |
| 1493880896015 | hello   |     2 |
| 1493880897017 | hello   |     2 |
| 1493880898015 | hello   |     2 |
| 1493880899016 | hello   |     2 | 

三、代码示例

1-模拟器StreamingSimulation

package com.dt.spark.main.Streaming

import java.io.PrintWriter
import java.net.ServerSocket

import scala.io.Source

/**
  * Created by hjw on 17/5/1.
  */
object StreamingSimulation {
  /*
  随机取整函数
   */
  def index(length:Int) ={
    import java.util.Random
    val rdm = new Random()
    rdm.nextInt(length)
  }

  def main(args: Array[String]) {
    if (args.length != 3){
      System.err.println("Usage: <filename><port><millisecond>")
      System.exit(1)
    }

    val filename = args(0)
    val lines = Source.fromFile(filename).getLines().toList
    val fileRow = lines.length

    val listener = new ServerSocket(args(1).toInt)

    //指定端口,当有请求时建立连接
    while(true){
      val socket = listener.accept()
      new Thread(){
        override def run() = {
          println("Got client connect from: " + socket.getInetAddress)
          val out =  new PrintWriter(socket.getOutputStream,true)
          while(true){
            Thread.sleep(args(2).toLong)
            //随机发送一行数据至client
            val content = lines(index(fileRow))
            println("-------------------------------------------")
            println(s"Time: ${System.currentTimeMillis()} ms")
            println("-------------------------------------------")
            println(content)
            out.write(content + '\n')
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }
}

2-ConnectionPool

package com.dt.spark.main.Streaming.Streaming2Mysql.Utils

import java.sql.{Statement, Connection, DriverManager}

import org.apache.log4j.Logger

/**
  * Created by hjw on 17/5/4.
  */
object ConnectionPool {
  val  log = Logger.getLogger(ConnectionPool.this.getClass)
  def mysqlExe(sql: String) {
    var conn: Connection = null
    val url: String = "jdbc:mysql://<mysql的url>/<数据库名>?" + "user=root&password=xxx&useUnicode=true&characterEncoding=UTF8"
    try {
      Class.forName("com.mysql.jdbc.Driver")
      conn = DriverManager.getConnection(url)
      val stmt: Statement = conn.createStatement
      stmt.executeUpdate(sql)
    }
    catch {
      case e: Exception => {
        e.printStackTrace
      }
    } finally {
      try {
        conn.close
      }
      catch {
        case e: Exception => {
        }
      }
    }
  }
}

3-Streaming2Mysql

 
 
package com.dt.spark.main.Streaming.Streaming2Mysql

import com.dt.spark.main.Streaming.Streaming2Mysql.Utils.ConnectionPool
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by hjw on 17/5/4.
  */
object Streaming2Mysql {
  Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]) {

    val path = "pathofcheckpoint"

    def NetworkWordStreaming(): StreamingContext = {
      val conf = new SparkConf().setMaster("local[5]").setAppName("NetworkWordCount")
      val ssc = new StreamingContext(conf, Seconds(1))
      // Create a DStream that will connect to hostname:port, like localhost:9999
      val lines = ssc.socketTextStream("localhost", 9999)

      // Split each line into words
      val words = lines.flatMap(_.split(" ")) // not necessary since Spark 1.3 // Count each word in each batch
      val pairs = words.map(word => (word, 1)).filter(!_.equals(""))
      val wordCounts = pairs.reduceByKey(_ + _)
      // Print the first ten elements of each RDD generated in this DStream to the console
      wordCounts.print()

      wordCounts.foreachRDD(rdd => {
        rdd.foreachPartition(partitionOfRecords => {
          partitionOfRecords.foreach(record => {
            ConnectionPool.mysqlExe(s"insert into streaming_keywordcnt_test(insert_time,keyword,count) values" +
              s" ('${System.currentTimeMillis()}', '${record._1}',${record._2} )")
          })
        }
        )
      }
      )
      //进行 checkpoint
      ssc.checkpoint(path)
      ssc
    }

    //从已有 checkpoint 文件恢复或者创建 StreamingContext,最后一个参数表示从 checkpoint 文件恢复时如果出错是否直接创建
    val ssc = StreamingContext.getOrCreate(path, NetworkWordStreaming, createOnError = true)

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }


}