1、使用c3p0
这个主要是因为c3p0实现了序列化,这样就可以直接传输到Worker上
ComboPooledDataSource
这个类主要是用来做生成数据库连接实例的,让它传到Worker上就可以直接使用了
2、业务代码
获取datasource
def getC3p0DateSource(filename:String,config:String): ComboPooledDataSource ={
val dataSource : ComboPooledDataSource = new ComboPooledDataSource(true)
val conf = FileUtils.readJsonFile2Prop(filename,config)
dataSource.setJdbcUrl(conf.getProperty("url"))
dataSource.setDriverClass(conf.getProperty("driverClassName"))
dataSource.setUser(conf.getProperty("username"))
dataSource.setPassword(conf.getProperty("password"))
dataSource.setMaxPoolSize(Integer.valueOf(conf.getProperty("maxPoolSize")))
dataSource.setMinPoolSize(Integer.valueOf(conf.getProperty("minPoolSize")))
dataSource.setAcquireIncrement(Integer.valueOf(conf.getProperty("acquireIncrement")))
dataSource.setInitialPoolSize(Integer.valueOf(conf.getProperty("initialPoolSize")))
dataSource.setMaxIdleTime(Integer.valueOf(conf.getProperty("maxIdleTime")))
dataSource
}
注意这里的InitialPoolSize不能太大
.foreachPartition(it=>{
val conn = comboPooledDataSource.getConnection
val statement = conn.prepareStatement("insert into tb_eventclass_min values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
conn.setAutoCommit(false)
it.foreach(x=>{
statement.setString(1,UUID.randomUUID().toString)
statement.setLong(2,x._1._1.toString.toLong)
statement.setLong(3,x._1._2.toString.toLong)
statement.setString(4,x._1._3.toString)
statement.setString(5,x._1._4.toString)
statement.setString(6,x._1._5.toString)
statement.setString(7,x._1._6.toString)
statement.setString(8,x._1._7.toString)
statement.setString(9,x._1._8.toString)
statement.setLong(10,x._2)
statement.setShort(11,x._1._10.toString.toShort)
statement.setLong(12,x._1._9/60000L)
val calendar = Calendar.getInstance()
calendar.setTime(new Date(x._1._9))
val year = calendar.get(Calendar.YEAR)
val month = calendar.get(Calendar.MONTH)+1
val day = calendar.get(Calendar.DAY_OF_MONTH)
val hour = calendar.get(Calendar.HOUR_OF_DAY)
val min = calendar.get(Calendar.MINUTE)
statement.setInt(13,year)
statement.setInt(14,month)
statement.setInt(15,day)
statement.setInt(16,hour)
statement.setInt(17,min)
statement.addBatch()
})
try {
statement.executeBatch()conn.commit()
}catch {
case e:Exception=>e.printStackTrace()
}finally {
statement.close()
conn.close()
}})
在这里有四个注意点:
1、使用foreachPartition算子,减少数据库连接
这样dataSource.getConnection生成的连接和partition数量是一直的(不会很多)
2、使用批量插入,提高效率
这里要注意要开启批量插入
在数据库连接的URl后面加上 rewriteBatchedStatements=true (启动批处理操作)
String dbUrl = "jdbc:mysql://localhost:3306/User? rewriteBatchedStatements=true";
3、关闭自动提交,防止死锁
conn.setAutoCommit(false)
4、在执行结束时要将statement和connect关闭
statement会一直增加消耗内存 connect归还到资源池中