之前通过 java nio 实现了按行读写文件的方法,但由于是单线程读写,所以当读取数据量比较大的文件时性能肯定会是个大问题,所以为了解决读取处理大文件时性能上的问题,就在原来按行读取文件的基础上增加了多线程的实现,并使用线程来进行线程调度,思路和其它相关博客都一样,只是在代码实现的方式上有所区别,整体思路就是根据初始化线程数量来将文件进行分段读取,一个线程读取一个片段的内容,然后将读取到的行数据交由事先注册的处理接口来进行处理。具体代码如下:
1.文件读取工具类:文件读取的具体实现类,提供读取文件的入口
/**
* 多线程按行读取文件工具类
* @author zyh
*
*/
public class FileReader {
private int threadNum = 3;//线程数,默认为3
private String filePath;//文件路径
private int bufSize = 1024;//缓冲区大小,默认为1024
private DataProcessHandler dataProcessHandler;//数据处理接口
private ExecutorService threadPool;
public FileReader(String filePath,int bufSize,int threadNum){
this.threadNum = threadNum;
this.bufSize = bufSize;
this.filePath = filePath;
this.threadPool = Executors.newFixedThreadPool(threadNum);
}
/**
* 启动多线程读取文件
*/
public void startRead(){
FileChannel infile = null;
try {
RandomAccessFile raf = new RandomAccessFile(filePath,"r");
infile = raf.getChannel();
long size = infile.size();
long subSize = size/threadNum;
for(int i = 0; i < threadNum; i++){
long startIndex = i*subSize;
if(size%threadNum > 0 && i == threadNum - 1){
subSize += size%threadNum;
}
RandomAccessFile accessFile = new RandomAccessFile(filePath,"r");
FileChannel inch = accessFile.getChannel();
threadPool.execute(new MultiThreadReader(inch,startIndex,subSize));
}
threadPool.shutdown();
} catch (FileNotFoundException e1) {
e1.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally{
try {
if(infile != null){
infile.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 注册数据处理接口
* @param dataHandler
*/
public void registerHanlder(DataProcessHandler dataHandler){
this.dataProcessHandler = dataHandler;
}
/**
* 多线程按行读取文件具体实现类
* @author zyh
*
*/
public class MultiThreadReader implements Runnable{
private FileChannel channel;
private long startIndex;
private long rSize;
public MultiThreadReader(FileChannel channel,long startIndex,long rSize){
this.channel = channel;
this.startIndex = startIndex > 0?startIndex - 1:startIndex;
this.rSize = rSize;
}
public void run(){
readByLine();
}
/**
* 按行读取文件实现逻辑
* @return
*/
public void readByLine(){
try {
ByteBuffer rbuf = ByteBuffer.allocate(bufSize);
channel.position(startIndex);//设置读取文件的起始位置
long endIndex = startIndex + rSize;//读取文件数据的结束位置
byte[] temp = new byte[0];//用来缓存上次读取剩下的部分
int LF = "\n".getBytes()[0];//换行符
boolean isEnd = false;//用于判断数据是否读取完
boolean isWholeLine = false;//用于判断第一行读取到的是否是完整的一行
long lineCount = 0;//行数统计
long endLineIndex = startIndex;//当前处理字节所在位置
while(channel.read(rbuf) != -1 && !isEnd){
int position = rbuf.position();
byte[] rbyte = new byte[position];
rbuf.flip();
rbuf.get(rbyte);
int startnum = 0;//每行的起始位置下标,相对于当前所读取到的byte数组
//判断是否有换行符
//如果读取到最后一行不是完整的一行时,则继续往后读取直至读取到完整的一行才结束
for(int i = 0; i < rbyte.length; i++){
endLineIndex++;
if(rbyte[i] == LF){//若存在换行符
if(channel.position() == startIndex){//若改数据片段第一个字节为换行符,说明第一行读取到的是完整的一行
isWholeLine = true;
startnum = i + 1;
}else{
byte[] line = new byte[temp.length + i - startnum + 1];
System.arraycopy(temp, 0, line, 0, temp.length);
System.arraycopy(rbyte, startnum, line, temp.length, i - startnum + 1);
startnum = i + 1;
lineCount++;
temp = new byte[0];
//处理数据
if(startIndex != 0){//如果不是第一个数据段
if(lineCount == 1){
if(isWholeLine){//当且仅当第一行为完整行时才处理
dataProcessHandler.process(line);
}
}else{
dataProcessHandler.process(line);
}
}else{
dataProcessHandler.process(line);
}
//结束读取的判断
if(endLineIndex >= endIndex){
isEnd = true;
break;
}
}
}
}
if(!isEnd && startnum < rbyte.length){//说明rbyte最后还剩不完整的一行
byte[] temp2 = new byte[temp.length + rbyte.length - startnum];
System.arraycopy(temp, 0, temp2, 0, temp.length);
System.arraycopy(rbyte, startnum, temp2, temp.length, rbyte.length - startnum);
temp = temp2;
}
rbuf.clear();
}
//兼容最后一行没有换行的情况
if(temp.length > 0){
if(dataProcessHandler != null){
dataProcessHandler.process(temp);
}
}
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public int getThreadNum() {
return threadNum;
}
public String getFilePath() {
return filePath;
}
public ExecutorService getThreadPool() {
return threadPool;
}
public int getBufSize() {
return bufSize;
}
}
2.数据处理接口:定义处理行数据的接口
/**
* 数据处理接口
* @author zyh
*
*/
public interface DataProcessHandler {
void process(byte[] data);
}
3.数据处理的具体实现类:
public class FileLineDataHandler implements DataProcessHandler {
private String encode = “GBK”;
@Override
public void process(byte[] data) {
try {
System.out.println(new String(data,encode).toString());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
public class MultiThreadReadByLine {
public static void main(String[] args){
FileReader fileReader = new FileReader(“D:\javaniotemp\test1.txt”,100,3);
fileReader.registerHanlder(new FileLineDataHandler());
fileReader.startRead();
}
}