NIO MappedByteBuffer读大文件并统计出现次数最多的TOP K个单词

时间:2020-12-08 17:21:08

最近学习NIO了解到MappedByteBuffer读取大文件很有优势,遂在网上搜索观看了好几篇博客,但大多数讲的都是理论。对于实战demo很少,或者过于简单,现结合一道常见的面试题:如何读取大文件并统计出现次数最多的TOP K个单词(IP/关键字),写了以下代码,以作记录并供人参考。

package com.io.nio;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* 读文件,统计单词数
* @author wuwenhai
* @since JDK1.6
* @history 2017-7-11 wuwenhai 新建
*/
public class WordsCountMappedByteBuffer {

static final int BUFFER_SIZE = 1024*10;
public static Map<String,Integer> map=new HashMap<String,Integer>();
static byte[] left=new byte[0];
static final int LF = 10;//换行符 ASCII
static final int CR = 13;//回车符 ASCII

@SuppressWarnings("resource")
public static void main(String[] args) throws IOException {
long start=System.currentTimeMillis();
FileChannel channel= new FileInputStream(new File("E://Alllog.log")).getChannel();
MappedByteBuffer buffer=channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
byte[] byteArray=new byte[BUFFER_SIZE];
while(buffer.hasRemaining()){
if(buffer.limit()-buffer.position()>BUFFER_SIZE){
buffer.get(byteArray);
deal(byteArray,false);
}else{
byte[] byteArray2=new byte[buffer.limit()-buffer.position()];
buffer.get(byteArray2);
deal(byteArray2,true);
}
}
long end=System.currentTimeMillis();
System.out.println("spend time :"+(end-start));//如果想看MappedByteBuffer的读取速度,可以将两个deal注释掉
System.out.println("count of total words :"+map.size());//单词总数
//show();
findTop(10);
//sortTreeMapShow();
}
/**
* 生硬地截取BUFFER_SIZE字节,必然会导致开头和末尾有问题,所以可以将byteArray分为2部分来处理。
* 从末尾逆向寻找最后一个分隔符坐标endIndex,将endIndex-末尾作为第二部分。
* 将0-endIndex作为第一部分。
* 每次deal的数据是:上一次deal的第二部分+本次deal的第一部分,同时保存本次deal的第二部分,以供下次deal使用。
* @param byteArray
* @param EOF
*/
public static void deal(byte[] byteArray,boolean EOF){
String input;
if(!EOF&&findLastDelimiter(byteArray)>0){
int endIndex=findLastDelimiter(byteArray);
byte[] firstPart=Arrays.copyOfRange(byteArray, 0,endIndex);
byte[] secondPart=Arrays.copyOfRange(byteArray, endIndex,byteArray.length);
byte[] mergeArray=arrayMerge(left,firstPart);
input=new String(mergeArray);
left=secondPart;
}else if(!EOF&&findLastDelimiter(byteArray)<0){//如果读取的字节数组里面没有分隔符,则该次读取的整个字节数组与left合并后保存至left供下次deal使用。
byte[] mergeArray=arrayMerge(left,byteArray);
left=mergeArray;
return;
}else{//如果是读到文件末尾了,则该次读取的整个字节数组与left合并后并处理
byte[] mergeArray=arrayMerge(left,byteArray);
input=new String(mergeArray);
}

Pattern pattern=Pattern.compile("[A-Z]?[a-z]+");
Matcher matcher=pattern.matcher(input);
while (matcher.find()) {
String word=matcher.group();
if(map.containsKey(word)){
map.put(word, map.get(word)+1);
}else{
map.put(word, 1);
}
}
}

public static int findLastDelimiter(byte[] byteArray){
for(int i=byteArray.length-1;i>=0;i--){
if(byteArray[i]==LF||byteArray[i]==CR){
return i;
}
}
return -1;
}

/**
* 数组合并
* @param first
* @param second
* @return
*/
public static byte[] arrayMerge(byte[] first,byte[] second){
byte[] result = Arrays.copyOf(first, first.length + second.length);
System.arraycopy(second, 0, result, first.length, second.length);
return result;
}

/**
* 遍历显示
*/
public static void show(){
for(Map.Entry<String, Integer> entry:map.entrySet()){
System.out.println(entry.getKey()+" : "+entry.getValue());
}
}

/**
* 寻找map结果中top k的数据,不排序
* 思路:寻找top k其实就是寻找某个数,列表里大于这个数的个数有k个。
* @param k
*/
public static void findTop(int k){
int max=0,min=Integer.MAX_VALUE,mid=0;
for(Map.Entry<String, Integer> entry:map.entrySet()){
if(entry.getValue()>max) max=entry.getValue();
if(entry.getValue()<min) min=entry.getValue();
}
while(max-min > 1)
{
mid = (max+min)/2;
if(countBiggerThanMid(map,mid) > k){
min = mid;
}else if(countBiggerThanMid(map,mid) < k){
max = mid;
}else{
break;
}
}
for(Map.Entry<String, Integer> entry:map.entrySet()){
if(entry.getValue()>=mid){
System.out.println(entry.getKey()+" : "+entry.getValue());
}
}
}

/**
* 在区间中寻找比mid大的数据的个数
* @param map
* @param mid
* @return
*/
public static int countBiggerThanMid(Map<String,Integer> map ,int mid){
int count=0;
for(Map.Entry<String, Integer> entry:map.entrySet()){
if(entry.getValue()>=mid)
count++;
}
return count;
}

/**
* 排序并显示
*/
public static void sortTreeMapShow(){
ValueComparator comparator = new ValueComparator(map);
TreeMap<String,Integer> treeMap = new TreeMap<String,Integer>(comparator);
treeMap.putAll(map);
System.out.println("results: "+treeMap);
}
}

class ValueComparator implements Comparator<String> {

Map<String, Integer> map;
public ValueComparator(Map<String, Integer> map) {
this.map = map;
}

public int compare(String key1, String key2) {
if (map.get(key1) >= map.get(key2)) {
return -1;
} else {
return 1;
}
}
}

在我本机(Window7 PC)测试以上代码读取了1.21G的日志文件,并统计了单词个数,用时51130ms。输出结果如下:

spend time :51130
count of total words :17989
org : 5876624
spring : 3572543
web : 5178433
servlet : 5061979
com : 3177096
ssm : 4061538
springframework : 5643885
lang : 2932488
Mapping : 3863405
java : 3002298


在jvisualvm中观察得到如下结果(可能有同学不知道怎么打开jvisualvm,啰嗦一下,在C:\Program Files\Java\jdk1.7.0_79\bin里面找到jvisualvm.exe,打开就好):

NIO MappedByteBuffer读大文件并统计出现次数最多的TOP K个单词

可以看到读上G的文件,堆内存使用才一百多M,因为内存映射文件并不在堆内存中,只是在需要的时候才发生缺页中断去读取。

PS:

上G的日志文件估计大家也不容易拿到,因为这只是一个例子,达到学习目的即可,所建议大家下载一些英语小说之类的文本文件(几十几百K),执行下面代码(多执行几次),即可获得需要大小的文件。

package com.io.io;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;

public class AppendFile {


/**
* 文件合并
* @param dest
* @param src
*/
private static void fileMerge(String dest, String src) {
for(int i=0;i<20;i++)
try {
long start =System.currentTimeMillis();
FileInputStream inputStream=new FileInputStream(new File(src));
FileOutputStream outputStream=new FileOutputStream(new File(dest),true);
byte[] buffer=new byte[1024*10];
int count;
while((count=inputStream.read(buffer))>0){
outputStream.write(buffer,0,count);
}
long end =System.currentTimeMillis();
System.out.println("spend time: "+(end-start));
inputStream.close();
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
fileMerge("I://debug.txt","I://debug2.txt");
}

}



理论:

1、在传统的文件IO操作中,都是调用操作系统提供的底层标准IO系统调用函数read()write(),此时调用此函数的进程(在JAVA中即JAVA进程)由当前的用户态切换到内核态,然后OS的内核代码负责将相应的文件数据读取到内核的IO缓冲区,然后再把数据从内核IO缓冲区拷贝到进程的私有地址空间中去,这样便完成了一次IO操作。为什么要搞一个内核IO缓冲区把原本只需一次拷贝数据的事情搞成需要2次数据拷贝呢?这是因为局部性原理,具体是空间局部性原理,OS根据在一次read()系统调用过程中预读更多的文件数据缓存在内核IO缓冲区中,当继续访问的文件数据在缓冲区中时便直接拷贝数据到进程私有空间,避免了再次的低效率磁盘IO操作。

2、当读取OS内核缓冲区数据的时候,便发起了一次系统调用操作(通过nativeC函数调用),而系统调用的代价相对来说是比较高的,涉及到进程用户态和内核态的上下文切换等一系列操作,IO包中提供了BufferedXXX类来作为缓冲区。如read()时候(synchronized)BufferedInputStream会根据情况自动预读更多的字节数据到它自己维护的一个内部字节数组缓冲区中(byte数组,默认8192字节),这样便可以减少系统调用次数,从而达到其缓冲区的目的。所以要明确的一点是BufferedInputStream的作用不是减少磁盘IO操作次数(这个OS已经做了),而是通过减少系统调用次数来提高性能的。如:

FileInputStream in = new FileInputStream("D:\\java.txt");   

BufferedInputStream buf_in = new BufferedInputStream(in);  

buf_in.read();

3、内存映射文件和标准IO操作最大的不同之处就在于它虽然最终也是要从磁盘读取数据,但是它并不需要将数据读取到OS内核缓冲区,而是直接将进程的用户私有地址空间中的一部分区域与文件对象建立起映射关系,就好像直接从内存中读、写文件一样。一开始并没有拷贝数据,而是当进程代码第一次引用这段代码内的虚拟地址时,触发了缺页异常,这时候OS根据映射关系直接将文件的相关部分数据拷贝到进程的用户私有空间中去,当有操作第N页数据的时候重复这样的OS页面调度程序操作。java中提供了3种内存映射模式,即:只读(readonly)、读写(read_write)、专用(private),对于只读模式来说,如果程序试图进行写操作,则会抛出ReadOnlyBufferException异常;第二种的读写模式表明了通过内存映射文件的方式写或修改文件内容的话是会立刻反映到磁盘文件中去;最后一种专用模式采用的是OS的“写时拷贝”原则,即在没有发生写操作的情况下,多个进程之间都是共享文件的同一块物理内存(进程各自的虚拟地址指向同一片物理地址),一旦某个进程进行写操作,那么将会把受影响的文件数据单独拷贝一份到进程的私有缓冲区中,不会反映到物理文件中去。如:

MappedByteBuffer buff = channel.map(FileChannel.MapMode.READ_ONLY, 0,channel.size());  

4、内存映射文件属于JVM中的直接缓冲区,还可以通过ByteBuffer.allocateDirect(),即DirectMemory的方式来创建直接缓冲区。直接内存DirectMemory的大小默认为-XmxJVM堆的最大值,但是并不受其限制,而是由JVM参数MaxDirectMemorySize单独控制。直接内存容易引起Full GC,因为其不受Minor GC影响,当众多的DirectByteBuffer对象从新生代被送入老年代后触发了Full GC时候才会回收直接内存。



如果感觉有用,恳请去github帮忙点个star,万谢。

https://github.com/a4227139/testNew