
val data2Mysql2 = (iterator: Iterator[(String, Int)]) => {
var conn: Connection = null;
var ps: PreparedStatement = null
val sql = "Insert into location_info(location,counts,accesse_date) values(?,?,?)" try {
conn = DriverManager.getConnection("jdbc://localhist:3306/bigdata","root","root")
//整个分区的数据用了一个conn
iterator.foreach(line =>{
ps = conn.prepareStatement(sql)
ps.setString(,line._1)
ps.setInt(,line._2)
ps.setDate(,new Date(System.currentTimeMillis()))
ps.executeUpdate()
}) } catch {
case e: Exception => println("Mysql Exception")
} finally {
if (ps != null) ps.close()
if (conn != null) conn.close()
}
rddres2.foreachPartition(data2MySQL)
def mysql2Spark(){
val conf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")
val sc = new SparkContext(conf)
val connection = () => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root")
}
val jdbcRDD = new JdbcRDD(
sc,
connection,
//location_info(location,counts
"SELECT id, location FROM location_info where id >= ? AND id <= ?",
, , ,
r => {
val id = r.getInt()
val code = r.getString()
(id, code)
}
)
val jrdd = jdbcRDD.collect()
println(jdbcRDD.collect().toBuffer)
sc.stop()
}