spark中的RDD有很多对应的实现,比如JdbcRDD,是用来从MySQL中读取数据的。
先来看一下JdbsRDD的源码:
/**
* An RDD that executes a SQL query on a JDBC connection and reads results.
* For usage example, see test case JdbcRDDSuite.
*
* @param getConnection a function that returns an open Connection.
* The RDD takes care of closing the connection.
* @param sql the text of the query.
* The query must contain two ? placeholders for parameters used to partition the results.
* For example,
* {{{
* select title, author from books where ? <= id and id <= ?
* }}}
* @param lowerBound the minimum value of the first placeholder
* @param upperBound the maximum value of the second placeholder
* The lower and upper bounds are inclusive.
* @param numPartitions the number of partitions.
* Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
* the query would be executed twice, once with (1, 10) and once with (11, 20)
* @param mapRow a function from a ResultSet to a single row of the desired result type(s).
* This should only call getInt, getString, etc; the RDD takes care of calling next.
* The default maps a ResultSet to an array of Object.
*/
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
extends RDD[T](sc, Nil) with Logging {
上面的内容主要是对参数的讲解:
翻译一下就是:
在JDBC连接上执行SQL查询并读取结果的RDD。
有关使用示例,请参见测试用例JdbcRDDSuite。
一个函数,返回一个打开的连接。
RDD负责关闭连接。
@param sql查询的文本。
查询必须包含两个?用于划分结果的参数的占位符。
例如,
{{{
选择书名,作者在哪里?<= id和id <= ?
}}}
@param下界第一个占位符的最小值
@param上界第二个占位符的最大值
下界和上界包括在内。
@param numpartition分区的数量。
给定下界为1,上界为20,分区数为2,
查询将执行两次,一次使用(1,10),一次使用(11,20)
@param mapRow函数从ResultSet到所需结果类型的一行。
这应该只调用getInt、getString等;接下来由RDD负责调用。
默认将ResultSet映射到对象数组。
由此我们可以知道new JdbcRDD时需要传入七个参数,这七个参数分别是:
1.一个SparkContext
2.一个用于打开jdbc连接的函数,这个函数没有参数,但是自动返回一个jdbc的连接
3.是一个sql语句,表示要从哪里读取数据,而且这个语句必须包含两个占位符(?)
4.下界,表示第一个占位符的最小值
5.上界,表示第二个占位符的最大值
6.分区个数
7.这里也是一个函数,表示读取数据的存放规则(默认将读取结果映射到对象数组里面),这里应该之调用getInt,getString等方法。
下面来写一个实例:
package XXX
import java.sql.{Connection, DriverManager}
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object JdbcRddDemo {
//定义一个函数,无参,返回一个jdbc的连接(用于创建JdbcRDD的第二个参数)
val getConn: () => Connection = () => {
DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","")
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("JdbcRddDemo").setMaster("local[4]")
val sc = new SparkContext(conf)
//创建RDD,这个RDD会记录以后从MySQL中读数据
val jdbcRDD: JdbcRDD[(String, Int)] = new JdbcRDD(
sc, //SparkContext
getConn, //返回一个jdbc连接的函数
"SELECT * FROM access_log WHERE id >= ? AND id <= ?", //sql语句(要包含两个占位符)
1, //第一个占位符的最小值
5, //第二个占位符的最大值
2, //分区数量
rs => {
val province = rs.getString(1)
val count = rs.getInt(2)
(province, count) //将读取出来的数据保存到一个元组中
}
)
//触发Action
val result: Array[(String, Int)] = jdbcRDD.collect()
println(result.toBuffer)
sc.stop()
}
}
注意
加入这是我要读取的数据库中的内容:
前面的有一个id字段
执行上面的程序之后,结果应该是:
(1,陕西,1824),(2,重庆,868),(3,河北,383),(4,北京,1535),(5,云南,126)
当我们把JdbcRDD参数中的那一条sql语句改成如下内容:
"SELECT * FROM access_log WHERE id >= ? AND id < ?", //少一个=号
这时输出的结果就是:
(1,陕西,1824),(3,河北,383),(4,北京,1535)
当我把下面的分区设为1时,读取的结果是:
(1,陕西,1824),(2,重庆,868),(3,河北,383),(4,北京,1535)
造成这种结果的原因就是:
当我们指定RDD的分区个数为2时,他会创建两个Task来读取数据,第一个Task读取前两个数据,第二个Task读取后三个数据,这时两个Task都会判断一下这个id是否小于这个分区中最大的id号,所以就没有第二条和第五条数据了。