最近与同事研究一题目:通过多线程处理大数据文件,数据文件大小为1G,文件格式为bin格式(纯文本,utf-8编码方式)。
主要要进行的工作是:通过多线程解析,每一条记录保存到mysql数据库中(查看文件可得500W条数据),线程数量要可配置,最重要要保证效率。
最主体的思路流程是:找一小体积bin文件,通过I/O读取,然后插入到数据库中;成功后针对这一流程慢慢进行修改。
于是,百度了一下bin文件的定义(请原谅我的无知),如下:
二进制文件,其用途依系统或应用而定 。一种文件格式binary的缩写。一个后缀名为".bin"的文件,只是表明它是binary格式。比如虚拟光驱文件常用。bin作为后缀,但并不意味着所有,bin文件都是虚拟光驱文件。 好吧,看了开头的五个字就懒得看下去了:二进制文件。 考虑到文件的大小,神马都不用说了,直接使用BufferedReader吧。 第一步读取文件的时候发生了点小意外,因为我随便找了个bin后缀的文件,文本里面包含中文字符,读取后到控制台输出乱码了,后来找了下资料,找到如下方法解决了:String newStr = new String(oldStr.getBytes(), "UTF-8");
以下是方法解释,新技能GET!
java中的String类是按照unicode进行编码的,当使用String(byte[] bytes, String encoding)构造字符串时,encoding所指的是bytes中的数据是按照那种方式编码的,而不是最后产生的String是什么编码方式,换句话说,是让系统把bytes中的数据由encoding编码方式转换成unicode编码。如果不指明,bytes的编码方式将由jdk根据操作系统决定。
当主体框架完成后,就开始思考实现方式了,于是,我们想到了以下三种实现方式:
第一种:啥都不说,声明静态块(BufferedReader);加锁(synchronized),多线程操作(一个线程需要完成的工作是:读取+解析+插入);
第二种:考虑了一下第一种方法的效率问题,加入线程池,思路是:主线程读取,每读一行,new一个线程(这里一个线程主要完成的工作是:解析+插入);再把线程放入线程池;
第三种:我们觉得比较前卫的方法,使用线程池+数据库连接池,把大数据文件按照配置的线程个数进行等量分隔,各自的线程分别操作各自的小文件,这里主线程要进行的操作是分割文件,每个线程要进行的是(读取+解析+插入),但是各线程需要处理的文件变小了,从而提高效率。
下面针对这三种方法一一解释:
在最开始的时候,遇到了一点小问题,数据文件里每一个字段是用箭头(→)进行分割的,如果直接使用split方法进行分割解析的话,会出现乱码,所以只能把字符转换为ASCII,再根据箭头的ASCII进行分割;
String转ASCII简单,直接使用String.toCharArray()转换为char数组;判断时直接把char强制类型转换为int类型即可;
以下为根据ASCII判断进行分割代码:(aList为每一行中的字段列表)
aList = new ArrayList<String>();
char[] chars = s.toCharArray();
int j = -1;
// 判断两个箭头的位置,然后把值放在里面
for (int i = 0; i < chars.length; i++) {
if ((int) chars[i] == 27) {
String bbb = "";
for (int k = j + 1; k < i; k++) {
bbb += chars[k];
}
aList.add(bbb);
j = i;
}
if (i == chars.length - 1) {
String bbb = "";
for (int k = j + 1; k <= i; k++) {
bbb += chars[k];
}
aList.add(bbb);
}
}
除此之外,读取文件的时候还受到了文件内空格的影响,后来使用trim()方法把空格去掉了,唉,旧技能遗忘了......
排除了以上两个小问题后,第一种方法很容易就写完了,代码如下:
public class DataFile implements Runnable {
private static BufferedReader reader = null;
private static int size = 0;
static {
try {
reader = new BufferedReader(new FileReader("xxxx.bin"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
String s = "";// 用来存放读出的数据
List<String> aList = new ArrayList<String>();
MingxiDAO mxDAO = new MingxiDAO();
int count = 0;
while (true) {
synchronized (reader) {
try {
while ((s = reader.readLine()
+ reader.readLine().trim()) != null) {
aList = new ArrayList<String>();
char[] chars = s.toCharArray();
if (count < 15) {
int j = -1;
// 判断两个箭头的位置,然后把值放在里面
for (int i = 0; i < chars.length; i++) {
if ((int) chars[i] == 27) {
String bbb = "";
for (int k = j + 1; k < i; k++) {
bbb += chars[k];
}
aList.add(bbb);
j = i;
}
if (i == chars.length - 1) {
String bbb = "";
for (int k = j + 1; k <= i; k++) {
bbb += chars[k];
}
aList.add(bbb);
}
}
count++;
} else {
int j = -1;
// 判断两个箭头的位置,然后把值放在里面
for (int i = 0; i < chars.length; i++) {
if ((int) chars[i] == 27) {
String bbb = "";
for (int k = j + 1; k < i; k++) {
bbb += chars[k];
}
aList.add(bbb);
j = i;
}
if (i == chars.length - 1) {
String bbb = "";
for (int k = j + 1; k <= i; k++) {
bbb += chars[k];
}
aList.add(bbb);
}
}
count = 0;
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1);
setSize(getSize() + 1);
System.out.println(Thread.currentThread().getName()+"==:size----------->" + getSize());
mxDAO.poolInsert(aList);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (s == null)
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
public synchronized int getSize() {
return size;
}
public synchronized void setSize(int size) {
DataFile.size = size;
}
}
主线程(main函数)直接new线程即可;
这种方法最简单,但是缺点主要有两点:
一.加锁导致效率极其低下,500W条数据谈何容易!
二.不加锁,效率明显提高,但是各线程抢资源,无法有效判断每一行数据是否已经读取,造成数据大量重复。
结论:第一种方法,out!
第二种方法,其实与第一种区别不大,使用了线程池,这里简单写下线程池的用法,新技能GET!
//声明含5个线程的线程池
ExecutorService pool = Executors.newScheduledThreadPool(5);
Thread t1 = new Thread();
pool.execute(t1);
pool.shutdown();
下面贴出第二种方法的代码:
public class Start {
private static BufferedReader reader = null;
MingxiDAO mxDAO = new MingxiDAO();
static {
try {
reader = new BufferedReader(new FileReader("xxxx.bin"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int u = 0;
String s = "";// 用来存放读出的数据
ExecutorService pool = Executors.newScheduledThreadPool(5);
try {
while ((s = reader.readLine() + reader.readLine().trim()) != null) {
char[] chars = s.toCharArray();
Threads thr = new Threads(chars);
Thread t1 = new Thread(thr);
pool.execute(t1);
u++;
System.out.println(u);
}
}catch (Exception e) {
e.printStackTrace();
}
pool.shutdown();
}
}
public class Threads implements Runnable {
public char[] chars;
public Threads(char[] chars) {
this.chars = chars;
}
@Override
public void run() {
List<String> aList = new ArrayList<String>();
MingxiDAO mxDAO = new MingxiDAO();
int j = -1;
// 判断两个箭头的位置,然后把值放在里面
for (int i = 0; i < chars.length; i++) {
if ((int) chars[i] == 27) {
String bbb = "";
for (int k = j + 1; k < i; k++) {
bbb += chars[k];
}
aList.add(bbb);
j = i;
}
if (i == chars.length - 1) {
String bbb = "";
for (int k = j + 1; k <= i; k++) {
bbb += chars[k];
}
aList.add(bbb);
}
}
try {
mxDAO.poolInsert(aList);
} catch (Exception e) {
e.printStackTrace();
}
}
}
这种方法虽然不需要加锁了,但是效率还是太低了,而且频繁创建线程,直接out!
这样看来,不能不使用第三种方法去尝试了,第三种方法加入了数据库连接池提高效率,啥都不说,直接百度呗,难道还有闲情自己写一个数据库连接池?
以下是百度得来的实现代码:
public class ConnectionPool {
private String jdbcDriver = ""; // 数据库驱动
private String dbUrl = ""; // 数据 URL
private String dbUsername = ""; // 数据库用户名
private String dbPassword = ""; // 数据库用户密码
private String testTable = ""; // 测试连接是否可用的测试表名,默认没有测试表
private int initialConnections = 1; // 连接池的初始大小
private int incrementalConnections = 5;// 连接池自动增加的大小
private int maxConnections = 20; // 连接池最大的大小
private Vector<PooledConnection> connections = null; // 存放连接池中数据库连接的向量 ,
// 初始时为 null
// 它中存放的对象为 PooledConnection 型
/**
*
* 构造函数
*
*
*
* @param jdbcDriver
* String JDBC 驱动类串
*
* @param dbUrl
* String 数据库 URL
*
* @param dbUsername
* String 连接数据库用户名
*
* @param dbPassword
* String 连接数据库用户的密码
*
*
*/
public ConnectionPool(String jdbcDriver, String dbUrl, String dbUsername, String dbPassword) {
this.jdbcDriver = jdbcDriver;
this.dbUrl = dbUrl;
this.dbUsername = dbUsername;
this.dbPassword = dbPassword;
try {
createPool();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* 返回连接池的初始大小
*
*
*
* @return 初始连接池中可获得的连接数量
*/
public int getInitialConnections() {
return this.initialConnections;
}
/**
*
* 设置连接池的初始大小
*
*
*
* @param 用于设置初始连接池中连接的数量
*/
public void setInitialConnections(int initialConnections) {
this.initialConnections = initialConnections;
}
/**
*
* 返回连接池自动增加的大小 、
*
*
*
* @return 连接池自动增加的大小
*/
public int getIncrementalConnections() {
return this.incrementalConnections;
}
/**
*
* 设置连接池自动增加的大小
*
* @param 连接池自动增加的大小
*/
public void setIncrementalConnections(int incrementalConnections) {
this.incrementalConnections = incrementalConnections;
}
/**
*
* 返回连接池中最大的可用连接数量
*
* @return 连接池中最大的可用连接数量
*/
public int getMaxConnections() {
return this.maxConnections;
}
/**
*
* 设置连接池中最大可用的连接数量
*
*
*
* @param 设置连接池中最大可用的连接数量值
*/
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
/**
*
* 获取测试数据库表的名字
*
*
*
* @return 测试数据库表的名字
*/
public String getTestTable() {
return this.testTable;
}
/**
*
* 设置测试表的名字
*
* @param testTable
* String 测试表的名字
*/
public void setTestTable(String testTable) {
this.testTable = testTable;
}
/**
*
*
*
* 创建一个数据库连接池,连接池中的可用连接的数量采用类成员
*
* initialConnections 中设置的值
*/
public synchronized void createPool() throws Exception {
// 确保连接池没有创建
// 如果连接池己经创建了,保存连接的向量 connections 不会为空
if (connections != null) {
return; // 如果己经创建,则返回
}
// 实例化 JDBC Driver 中指定的驱动类实例
Driver driver = (Driver) (Class.forName(this.jdbcDriver).newInstance());
DriverManager.registerDriver(driver); // 注册 JDBC 驱动程序
// 创建保存连接的向量 , 初始时有 0 个元素
connections = new Vector<PooledConnection>();
// 根据 initialConnections 中设置的值,创建连接。
createConnections(this.initialConnections);
System.out.println("create pool");
}
/**
*
* 创建由 numConnections 指定数目的数据库连接 , 并把这些连接
*
* 放入 connections 向量中
*
*
*
* @param numConnections
* 要创建的数据库连接的数目
*/
private void createConnections(int numConnections) throws SQLException {
// 循环创建指定数目的数据库连接
for (int x = 0; x < numConnections; x++) {
// 是否连接池中的数据库连接的数量己经达到最大?最大值由类成员 maxConnections
// 指出,如果 maxConnections 为 0 或负数,表示连接数量没有限制。
// 如果连接数己经达到最大,即退出。
System.out.println(this.connections.size() + "," + this.maxConnections);
if (this.maxConnections > 0 && this.connections.size() >= this.maxConnections) {
System.out.println("连接数己经达到最大");
break;
}
// add a new PooledConnection object to connections vector
// 增加一个连接到连接池中(向量 connections 中)
try {
connections.addElement(new PooledConnection(newConnection()));
} catch (SQLException e) {
System.out.println(" 创建数据库连接失败! " + e.getMessage());
throw new SQLException();
}
System.out.println(" 数据库连接己创建 ......");
}
}
/**
*
* 创建一个新的数据库连接并返回它
*
*
*
* @return 返回一个新创建的数据库连接
*/
private Connection newConnection() throws SQLException {
// 创建一个数据库连接
Connection conn = DriverManager.getConnection(dbUrl, dbUsername, dbPassword);
// 如果这是第一次创建数据库连接,即检查数据库,获得此数据库允许支持的
// 最大客户连接数目
// connections.size()==0 表示目前没有连接己被创建
if (connections.size() == 0) {
DatabaseMetaData metaData = conn.getMetaData();
int driverMaxConnections = metaData.getMaxConnections();
// 数据库返回的 driverMaxConnections 若为 0 ,表示此数据库没有最大
// 连接限制,或数据库的最大连接限制不知道
// driverMaxConnections 为返回的一个整数,表示此数据库允许客户连接的数目
// 如果连接池中设置的最大连接数量大于数据库允许的连接数目 , 则置连接池的最大
// 连接数目为数据库允许的最大数目
if (driverMaxConnections > 0 && this.maxConnections > driverMaxConnections) {
this.maxConnections = driverMaxConnections;
}
}
return conn; // 返回创建的新的数据库连接
}
/**
*
* 通过调用 getFreeConnection() 函数返回一个可用的数据库连接 ,
*
* 如果当前没有可用的数据库连接,并且更多的数据库连接不能创
*
* 建(如连接池大小的限制),此函数等待一会再尝试获取。
*
*
*
* @return 返回一个可用的数据库连接对象
*/
public synchronized PooledConnection getConnection() throws SQLException {
// 确保连接池己被创建
if (connections == null) {
return null; // 连接池还没创建,则返回 null
}
PooledConnection conn = getFreeConnection(); // 获得一个可用的数据库连接
// 如果目前没有可以使用的连接,即所有的连接都在使用中
while (conn == null) {
// 等一会再试
wait(250);
conn = getFreeConnection(); // 重新再试,直到获得可用的连接,如果
// getFreeConnection() 返回的为 null
// 则表明创建一批连接后也不可获得可用连接
}
return conn;// 返回获得的可用的连接
}
/**
*
* 本函数从连接池向量 connections 中返回一个可用的的数据库连接,如果
*
* 当前没有可用的数据库连接,本函数则根据 incrementalConnections 设置
*
* 的值创建几个数据库连接,并放入连接池中。
*
* 如果创建后,所有的连接仍都在使用中,则返回 null
*
* @return 返回一个可用的数据库连接
*/
public void print() {
System.out.println("total connection:" + connections.size());
int i = 1;
for (PooledConnection conn : connections) {
System.out.println("---" + i + ":" + conn.isBusy());
}
}
private PooledConnection getFreeConnection() throws SQLException {
// 从连接池中获得一个可用的数据库连接
PooledConnection conn = findFreeConnection();
if (conn == null) {
// 如果目前连接池中没有可用的连接
// 创建一些连接
System.out.println("目前连接池中没有可用的连接,创建一些连接 ");
createConnections(incrementalConnections);
// 重新从池中查找是否有可用连接
conn = findFreeConnection();
if (conn == null) {
// 如果创建连接后仍获得不到可用的连接,则返回 null
return null;
}
}
return conn;
}
/**
*
* 查找连接池中所有的连接,查找一个可用的数据库连接,
*
* 如果没有可用的连接,返回 null
*
*
*
* @return 返回一个可用的数据库连接
*/
private PooledConnection findFreeConnection() throws SQLException {
// 获得连接池向量中所有的对象
for (int i = 0; i < connections.size(); i++) {
PooledConnection pc = connections.elementAt(i);
// System.out.println("pConn.isBusy():"+pConn.isBusy());
if (!pc.isBusy()) {
// 如果此对象不忙,则获得它的数据库连接并把它设为忙
Connection conn = pc.getConnection();
pc.setBusy(true);
// 测试此连接是否可用
if (!isValid(conn)) {
// 如果此连接不可再用了,则创建一个新的连接,
// 并替换此不可用的连接对象,如果创建失败,删除该无效连接,遍历下一个不忙连接
try {
conn = newConnection();
pc.setConnection(conn);
} catch (SQLException e) {
e.printStackTrace();
connections.remove(i--);
continue;
}
}
return pc; // 己经找到一个可用的连接,退出
}
}
return null;// 返回找到到的可用连接
}
/**
*
* 测试一个连接是否可用,如果不可用,关掉它并返回 false
*
* 否则可用返回 true
*
*
*
* @param conn
* 需要测试的数据库连接
*
* @return 返回 true 表示此连接可用, false 表示不可用
*/
private boolean isValid(Connection conn) {
try {
return conn.isValid(3000);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return false;
}
// try {
//
// // 判断测试表是否存在
//
// if (testTable.equals("")) {
//
// // 如果测试表为空,试着使用此连接的 setAutoCommit() 方法
//
// // 来判断连接否可用(此方法只在部分数据库可用,如果不可用 ,
//
// // 抛出异常)。注意:使用测试表的方法更可靠
//
// conn.setAutoCommit(true);
//
// } else {// 有测试表的时候使用测试表测试
//
// // check if this connection is valid
//
// Statement stmt = conn.createStatement();
//
// stmt.execute("select count(*) from " + testTable);
//
// }
//
// } catch (SQLException e) {
//
// // 上面抛出异常,此连接己不可用,关闭它,并返回 false;
//
// closeConnection(conn);
//
// return false;
//
// }
//
// // 连接可用,返回 true
//
// return true;
}
/**
*
* 此函数返回一个数据库连接到连接池中,并把此连接置为空闲。
*
* 所有使用连接池获得的数据库连接均应在不使用此连接时返回它。
*
*
*
* @param 需返回到连接池中的连接对象
*/
public void returnConnection(Connection conn) {
// 确保连接池存在,如果连接没有创建(不存在),直接返回
if (connections == null) {
System.out.println(" 连接池不存在,无法返回此连接到连接池中 !");
return;
}
PooledConnection pConn = null;
Enumeration<PooledConnection> enumerate = connections.elements();
// 遍历连接池中的所有连接,找到这个要返回的连接对象
while (enumerate.hasMoreElements()) {
pConn = (PooledConnection) enumerate.nextElement();
// 先找到连接池中的要返回的连接对象
if (conn == pConn.getConnection()) {
// 找到了 , 设置此连接为空闲状态
pConn.setBusy(false);
break;
}
}
}
/**
*
* 刷新连接池中所有的连接对象
*
*
*/
public synchronized void refreshConnections() throws SQLException {
// 确保连接池己创新存在
if (connections == null) {
System.out.println(" 连接池不存在,无法刷新 !");
return;
}
PooledConnection pConn = null;
Enumeration<PooledConnection> enumerate = connections.elements();
while (enumerate.hasMoreElements()) {
// 获得一个连接对象
pConn = (PooledConnection) enumerate.nextElement();
// 如果对象忙则等 5 秒 ,5 秒后直接刷新
if (pConn.isBusy()) {
wait(5000); // 等 5 秒
}
// 关闭此连接,用一个新的连接代替它。
closeConnection(pConn.getConnection());
pConn.setConnection(newConnection());
pConn.setBusy(false);
}
}
/**
*
* 关闭连接池中所有的连接,并清空连接池。
*/
public synchronized void closeConnectionPool() throws SQLException {
// 确保连接池存在,如果不存在,返回
if (connections == null) {
System.out.println(" 连接池不存在,无法关闭 !");
return;
}
PooledConnection pConn = null;
Enumeration<PooledConnection> enumerate = connections.elements();
while (enumerate.hasMoreElements()) {
pConn = (PooledConnection) enumerate.nextElement();
// 如果忙,等 5 秒
if (pConn.isBusy()) {
wait(5000); // 等 5 秒
}
// 5 秒后直接关闭它
closeConnection(pConn.getConnection());
// 从连接池向量中删除它
connections.removeElement(pConn);
}
// 置连接池为空
connections = null;
}
/**
*
* 关闭一个数据库连接
*
*
*
* @param 需要关闭的数据库连接
*/
private void closeConnection(Connection conn) {
try {
conn.close();
} catch (SQLException e) {
System.out.println(" 关闭数据库连接出错: " + e.getMessage());
}
}
/**
*
* 使程序等待给定的毫秒数
*
*
*
* @param 给定的毫秒数
*/
private void wait(int mSeconds) {
try {
Thread.sleep(mSeconds);
} catch (InterruptedException e) {
}
}
/**
*
* 内部使用的用于保存连接池中连接对象的类
*
* 此类中有两个成员,一个是数据库的连接,另一个是指示此连接是否
*
* 正在使用的标志。
*/
public class PooledConnection {
private Connection connection = null;// 数据库连接
private boolean busy ; // 此连接是否正在使用的标志,默认没有正在使用
// 构造函数,根据一个 Connection 构告一个 PooledConnection 对象
private PooledConnection(Connection connection) {
this.connection = connection;
}
public ResultSet executeQuery(String sql) throws SQLException {
return connection.createStatement().executeQuery(sql);
}
public int executeUpdate(String sql) throws SQLException {
return connection.createStatement().executeUpdate(sql);
}
// 返回此对象中的连接
private Connection getConnection() {
return connection;
}
// 设置此对象的,连接
private void setConnection(Connection connection) {
this.connection = connection;
}
// 获得对象连接是否忙
private boolean isBusy() {
return busy;
}
// 设置对象的连接正在忙
private void setBusy(boolean busy) {
// System.out.println("set to busy:"+busy);
this.busy = busy;
}
public void close() {
busy = false;
}
}
}
public class DBManager {
private static PooledConnection conn;
private static ConnectionPool connectionPool;
private static DBManager inst;
public void close() {
try {
connectionPool.closeConnectionPool();
} catch (SQLException e) {
e.printStackTrace();
}
}
public DBManager() {
if (inst != null)
return;
String connStr = String.format("jdbc:mysql://%s:%d/%s", "localhost", 3306,
"xxxx");
connectionPool = new ConnectionPool("com.mysql.jdbc.Driver", connStr, "username", "password");
try {
connectionPool.createPool();
inst = this;
} catch (Exception e) {
e.printStackTrace();
}
}
public static PooledConnection getConnection() {
if (inst == null)
new DBManager();
try {
conn = connectionPool.getConnection();
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
}
下面,贴出第三种方法的实现代码:
public class BeanRunApp {
/**
* Method main
*
*
* @param args
*
*/
public static void main(String[] args) throws IOException {
// 写入文件的路径
String filePath = "D:\\123";
// 切分文件的路径
String sqlitFilePath = "D:\\123\\fg";
Scanner sca = new Scanner(System.in);
System.out.println("请输入线程个数:");
String line = sca.nextLine();
// 子文件的个数,线程个数
int CountFile = Integer.parseInt(line);
// 精度
long startNumber = System.currentTimeMillis();
System.out.println("正在进行文件分割,请稍后......");
// 将大数据文件切分到另外的小文件中
sqlitFileDate(filePath, sqlitFilePath, CountFile);
System.out.println("文件切割完毕!");
long stopNumber = System.currentTimeMillis();
System.out.println("文件切割耗时" + (stopNumber - startNumber) / 1000 + "秒");
String[] a = new String[CountFile];
for(int i = 0; i < CountFile; i++ ){
int j = i + 1;
a[i] = "D:\\123\\fg\\12" + j + ".bin";
}
ExecutorService pool = Executors.newScheduledThreadPool(CountFile);
for (int i = 0; i < a.length; i++) // 用数组的长度来作为循环条件
{ // 把这个数组和i的值作为构造函数传入线程类
MyThread myth = new MyThread(a, i, CountFile);
System.out.println("--------------------------------");
//myth.start(); // 执行
pool.execute(myth);
System.out.println("当前的线程是:" + myth.getName());
}
pool.shutdown();
}
// 将大数据文件切分到另外的小文件中
public static void sqlitFileDate(String filepath, String sqlitPath,
int CountFile) throws IOException {
FileWriter fs = null;
BufferedWriter fw = null;
FileReader fr = new FileReader(filepath + "\\xxxx.bin");
BufferedReader br = new BufferedReader(fr); // 读取获取整行数据
int i = 1;
LinkedList<FileWriter> WriterLists = new LinkedList<FileWriter>(); // 初始化文件流对象集合
LinkedList<BufferedWriter> fwLists = new LinkedList<BufferedWriter>();
for (int j = 1; j <= CountFile; j++) {
// 声明对象
fs = new FileWriter(sqlitPath + "\\12" + j + ".bin", false);
fw = new BufferedWriter(fs);
// 将对象装入集合
WriterLists.add(fs);
fwLists.add(fw);
}
// 判断是文件流中是否还有数据返回
while (br.ready()) {
int count = 1;// 初始化第一文件流
for (Iterator<BufferedWriter> iterator = fwLists.iterator(); iterator.hasNext();) {
BufferedWriter type = (BufferedWriter) iterator.next();
if (i == count)// 判断轮到第几个文件流写入数据了
{
// 写入数据,跳出,进行下一个文件流,下一个数据的写入
type.write(br.readLine() + br.readLine().trim() + "\r\n");
break;
}
count++;
}
// 判断是否到了最后一个文件流了
if (i >= CountFile) {
i = 1;
} else
i++;
}
br.close();
fr.close();
for (Iterator<BufferedWriter> iterator = fwLists.iterator(); iterator.hasNext();) {
BufferedWriter object = (BufferedWriter) iterator.next();
object.close();
}
// 遍历关闭所有子文件流
for (Iterator<FileWriter> iterator = WriterLists.iterator(); iterator.hasNext();) {
FileWriter object = (FileWriter) iterator.next();
object.close();
}
}
}
public class MyThread extends Thread {
private int index; // 用于数组的位置
private String[] fileNames; // 定义一个字符串数组
private Read read = new Read();
private int length; //线程数量,子文件数量
public MyThread(String[] fileNames, int index, int length) // index表示数组位置标号
{
this.index = index;
this.fileNames = fileNames;
this.length = length;
}
public void run() {
long startTime = System.currentTimeMillis();
int size = 0;
try {
size = read.read(fileNames[index], index);
} catch (IOException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println("线程" + (index+1) + "耗时" + (endTime - startTime) / 1000 + "秒");
System.out.println("线程" + (index+1) + "的size为:" + size);
int sum = size * length;
System.out.println("总size为:" + sum);
}
}
public class Read {
private int size = 0;
private BufferedReader reader;
private PooledConnection poolConn = null;
public int read(String fileName, int index) throws IOException {
List<String> aList = new ArrayList<String>();
poolConn = DBManager.getConnection();
reader = new BufferedReader(new FileReader(fileName), 100);
try {
String s = "";
while ((s = reader.readLine()) != null) {
aList = new ArrayList<String>();
char[] chars = s.toCharArray();
int j = -1;
// 判断两个箭头的位置,然后把值放在里面
for (int i = 0; i < chars.length; i++) {
if ((int) chars[i] == 27) {
String bbb = "";
for (int k = j + 1; k < i; k++) {
bbb += chars[k];
}
aList.add(bbb);
j = i;
}
if (i == chars.length - 1) {
String bbb = "";
for (int k = j + 1; k <= i; k++) {
bbb += chars[k];
}
aList.add(bbb);
}
}
try {
size++;
poolInsert(aList);
System.out.println("线程" + (index+1) + ":size------->" + size);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}finally{
poolConn.close();
}
return size;
}
public void poolInsert(List<String> list){
String sql = "insert into mingxi values(";
for(int i = 0; i < list.size(); i++){
sql = sql + "'";
sql = sql + list.get(i).toString();
sql = sql + "'";
if(i != list.size() -1 ){
sql += ",";
}
}
sql += ")";
try {
poolConn.executeUpdate(sql);
} catch (SQLException e) {
e.printStackTrace();
} finally {
//poolConn.close();
}
}
}
第三种方法的效率非常可观,把500W数据读取,解析到插入数据库,只需要十来二十分钟,但是,缺点还是有的:
一.主线程分割文件效率问题,随着分割的文件数量增多,效率越慢;
二.分割文件后硬盘空间占用问题;
三.最致命的问题,运行程序时,内存要求非常高,JVM默认分配的内存空间远远不够,如果不加量加价,到后面解析时会越来越慢,最后直接卡死;
所以,为了解决第三点的问题,我在Run Configurations中的Arguments框中加入了修改JVM分配内存的参数:-Xms4096m -Xmx4096m;这样,才得以保证了效率。
可是,由于我电脑是8G内存才可以这么奢侈的分配,要是内存小的电脑呢? no law to see!
备注,参数解释:
-Xms是设置内存初始化的大小
-Xmx是设置最大能够使用内存的大小(最好不要超过物理内存大小)
这个问题到此暂且告一段落了,因为还没想到更快更好的办法,要是有大神们有好的思路,麻烦多多指教啊。
最后,为了完美,贴上DAO层的代码吧。(懒得配置,直接使用JDBC了,哈哈)
public class MingxiDAO {
public void poolInsert(List<String> list){
String sql = "insert into mingxi values(";
for(int i = 0; i < list.size(); i++){
sql = sql + "'";
sql = sql + list.get(i).toString();
sql = sql + "'";
if(i != list.size() -1 ){
sql += ",";
}
}
sql += ")";
PooledConnection poolConn = null;
try {
poolConn = DBManager.getConnection();
poolConn.executeUpdate(sql);
} catch (SQLException e) {
e.printStackTrace();
} finally {
poolConn.close();
}
}
}