spark之JDBCRDD--从Mysql中读取数据

时间:2024-03-20 10:08:15

spark中的RDD有很多对应的实现,比如JdbcRDD,是用来从MySQL中读取数据的。
先来看一下JdbsRDD的源码:

/**
 * An RDD that executes a SQL query on a JDBC connection and reads results.
 * For usage example, see test case JdbcRDDSuite.
 *
 * @param getConnection a function that returns an open Connection.
 *   The RDD takes care of closing the connection.
 * @param sql the text of the query.
 *   The query must contain two ? placeholders for parameters used to partition the results.
 *   For example,
 *   {{{
 *   select title, author from books where ? <= id and id <= ?
 *   }}}
 * @param lowerBound the minimum value of the first placeholder
 * @param upperBound the maximum value of the second placeholder
 *   The lower and upper bounds are inclusive.
 * @param numPartitions the number of partitions.
 *   Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
 *   the query would be executed twice, once with (1, 10) and once with (11, 20)
 * @param mapRow a function from a ResultSet to a single row of the desired result type(s).
 *   This should only call getInt, getString, etc; the RDD takes care of calling next.
 *   The default maps a ResultSet to an array of Object.
 */
class JdbcRDD[T: ClassTag](
    sc: SparkContext,
    getConnection: () => Connection,
    sql: String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
  extends RDD[T](sc, Nil) with Logging {

上面的内容主要是对参数的讲解:
翻译一下就是:

在JDBC连接上执行SQL查询并读取结果的RDD。
有关使用示例,请参见测试用例JdbcRDDSuite。
一个函数,返回一个打开的连接。
RDD负责关闭连接。
@param sql查询的文本。
查询必须包含两个?用于划分结果的参数的占位符。
例如,
{{{
选择书名,作者在哪里?<= id和id <= ?
}}}
@param下界第一个占位符的最小值
@param上界第二个占位符的最大值
下界和上界包括在内。
@param numpartition分区的数量。
给定下界为1,上界为20,分区数为2,
查询将执行两次,一次使用(1,10),一次使用(11,20)
@param mapRow函数从ResultSet到所需结果类型的一行。
这应该只调用getInt、getString等;接下来由RDD负责调用。
默认将ResultSet映射到对象数组。

由此我们可以知道new JdbcRDD时需要传入七个参数,这七个参数分别是:
1.一个SparkContext
2.一个用于打开jdbc连接的函数,这个函数没有参数,但是自动返回一个jdbc的连接
3.是一个sql语句,表示要从哪里读取数据,而且这个语句必须包含两个占位符(?)
4.下界,表示第一个占位符的最小值
5.上界,表示第二个占位符的最大值
6.分区个数
7.这里也是一个函数,表示读取数据的存放规则(默认将读取结果映射到对象数组里面),这里应该之调用getInt,getString等方法。

下面来写一个实例:

package XXX

import java.sql.{Connection, DriverManager}

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

object JdbcRddDemo {
//定义一个函数,无参,返回一个jdbc的连接(用于创建JdbcRDD的第二个参数)
  val getConn: () => Connection = () => {
    DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","")
  }

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("JdbcRddDemo").setMaster("local[4]")
    val sc = new SparkContext(conf)

    //创建RDD,这个RDD会记录以后从MySQL中读数据

    val jdbcRDD: JdbcRDD[(String, Int)] = new JdbcRDD(
      sc,            //SparkContext
      getConn,        //返回一个jdbc连接的函数
      "SELECT * FROM access_log WHERE id >= ? AND id <= ?",  //sql语句(要包含两个占位符)
      1,      //第一个占位符的最小值
      5,    //第二个占位符的最大值
      2, //分区数量
      rs => {
        val province = rs.getString(1)
        val count = rs.getInt(2)
        (province, count)           //将读取出来的数据保存到一个元组中
      }
    )

    //触发Action
    val result: Array[(String, Int)] = jdbcRDD.collect()
    println(result.toBuffer)

    sc.stop()

  }

}

注意

加入这是我要读取的数据库中的内容:
前面的有一个id字段
spark之JDBCRDD--从Mysql中读取数据
执行上面的程序之后,结果应该是:
(1,陕西,1824),(2,重庆,868),(3,河北,383),(4,北京,1535),(5,云南,126)

当我们把JdbcRDD参数中的那一条sql语句改成如下内容:

 "SELECT * FROM access_log WHERE id >= ? AND id < ?",  //少一个=号

这时输出的结果就是:
(1,陕西,1824),(3,河北,383),(4,北京,1535)
当我把下面的分区设为1时,读取的结果是:
(1,陕西,1824),(2,重庆,868),(3,河北,383),(4,北京,1535)

造成这种结果的原因就是:

当我们指定RDD的分区个数为2时,他会创建两个Task来读取数据,第一个Task读取前两个数据,第二个Task读取后三个数据,这时两个Task都会判断一下这个id是否小于这个分区中最大的id号,所以就没有第二条和第五条数据了。
spark之JDBCRDD--从Mysql中读取数据