在数据分析领域,数据库是我们的好帮手。不仅可以接受我们的查询时间,还可以在这基础上做进一步分析。所以,我们必然要在数据库插入数据。在实际应用中,我们经常遇到千万级,甚至更大的数据量。如果没有一个快速的插入方法,则会事倍功半,花费大量的时间。
在参加阿里的天池大数据算法竞赛中(流行音乐趋势预测),我遇到了这样的问题,在没有优化数据库查询及插入之前,我花了不少冤枉时间,没有优化之前,1500万条数据,光插入操作就花费了不可思议的12个小时以上(使用最基本的逐条插入)。这也促使我思考怎样优化数据库插入及查询操作,提高效率。
在不断优化过程中,性能有大幅提升。在按时间序列从数据库查询并汇总生成2万6000多首歌曲的下载,播放,收藏数过程中,通过查询生成的操作速度提高从预估的40多小时降低到一小时多。在数据库插入方面,性能得到大幅提升;在新的数据集上测试,5490万+的数据,20分钟完成了插入。下面分享一下我的心得。
优化过程分为2步。第一步,实验静态reader从CSV文件读取数据,达到一定量时,开始多线程插入数据库程序;第二步,使用mysq批量插入操作。
第一步,读取文件,开始插入多线程
在这里,达到一定量的量是个需要斟酌的问题,在我的实验中,开始使用100w作为这个量,但是出现了新的问题,Java 堆内存溢出,最终采用了10W作为量的标准。
当然,可以有其他的量,看大家自己喜欢那个了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import preprocess.ImportDataBase;
public class MuiltThreadImportDB {
/**
* Java多线程读大文件并入库
*
* @param args
*/
private static int m_record = 99999 ;
private static BufferedReader br = null ;
private ArrayList<String> list;
private static int m_thread = 0 ;
static {
try {
br = new BufferedReader(
new FileReader(
"E:/tianci/IJCAI15 Data/data_format1/user_log_format1.csv" ), 8192 );
} catch (FileNotFoundException e) {
e.printStackTrace();
}
try {
br.readLine(); // 去掉CSV Header
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() {
String line;
int count = 0 ;
list = new ArrayList<String>(m_record + 1 );
synchronized (br) {
try {
while ((line = br.readLine()) != null ) {
if (count < m_record) {
list.add(line);
count++;
} else {
list.add(line);
count = 0 ;
Thread t1 = new Thread( new MultiThread(list),Integer.toString(m_thread++));
t1.start();
list = new ArrayList<String>(m_record + 1 );
}
}
if (list != null ) {
Thread t1 = new Thread( new MultiThread(list),Integer.toString(m_thread++));
t1.start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new MuiltThreadImportDB().start();
}
}
|
第二步,使用多线程,批量插入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
class MultiThread implements Runnable {
private ArrayList<String> list;
public MultiThread(ArrayList<String> list) {
this .list = list;
}
public void run() {
try {
ImportDataBase insert = new ImportDataBase(list);
insert.start();
} catch (FileNotFoundException e) {
e.printStackTrace();
}
display( this .list);
}
public void display(List<String> list) {
// for (String str : list) {
// System.out.println(str);
// }
System.out.print(Thread.currentThread().getName() + " :" );
System.out.println(list.size());
}
}
|
批量操作中,使用mysql的prepareStatement类,当然也使用了statement类的批量操作,性能比不上前者。前者可以达到1w+每秒的插入速度,后者只有2000+;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public int insertUserBehaviour(ArrayList<String> sqls) throws SQLException {
String sql = "insert into user_behaviour_log (user_id,item_id,cat_id,merchant_id,brand_id,time_stamp,action_type)"
+ " values(?,?,?,?,?,?,?)" ;
preStmt = conn.prepareStatement(sql);
for ( int i = 0 ; i < sqls.size(); i++) {
UserLog log = new UserLog(sqls.get(i));
preStmt.setString( 1 , log.getUser_id());
preStmt.setString( 2 , log.getItem_id());
preStmt.setString( 3 , log.getCat_id());
preStmt.setString( 4 , log.getMerchant_id());
preStmt.setString( 5 , log.getBrand_id());
preStmt.setString( 6 , log.getTimeStamp());
preStmt.setString( 7 , log.getActionType());
preStmt.addBatch();
if ((i + 1 ) % 10000 == 0 ) {
preStmt.executeBatch();
conn.commit();
preStmt.clearBatch();
}
}
preStmt.executeBatch();
conn.commit();
return 1 ;
}
|
当然,也实验了不同的mysql存储引擎,InnoDB和MyISM,实验结果发现,InnoDB更快(3倍左右),可能和mysq的新版本有关系,笔者的mysql版本是5.6。
最后总结一下,大数据量下,提高插入速度的方法。
Java代码方面,使用多线程插入,并且使用批处理提交。
数据库方面,表结构建立时不要使用索引,要不然插入过程过还要维护索引B+树;修改存储引擎,一般默认是InnoDB,(新版本就使用默认就可以,老版本可能需要)。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/oldbai001/article/details/51693139