假设有一本电子书(file.txt),文件大小1G,编写一个函数,以最快的速度计算出出现频次前20名的单词,以及总共有多少行。
运行环境为java,jvm主要参数:-server -Xms32m -Xmx32m -Xmn32m -Xss356k -XX:PermSize=16m -XX:MaxPermSize=16m。计算机配置内存为4G,磁盘空间为128G。
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* 统计1G文件中出现次数最多的单词
*
* @version $Id: CountWord.java, v 0.1 2018年02月03日 21:19 yunmo.hl Exp $
*/
public class CountWord {
private static String testWordsFile = "/Users/huleiwind/Downloads/words.txt";
private static int splitCount = 100;
private static int minHeapSize = 20;
private ExecutorService executorService;
private CompletionService<Node[]> completionService;
public CountWord() {
executorService = Executors.newFixedThreadPool(splitCount);
completionService = new ExecutorCompletionService<Node[]>(executorService);
}
public static void main(String args[]) {
CountWord countWord = new CountWord();
long start = System.currentTimeMillis();
int totalCount = 0;
//读取原始文件
Scanner scanner = null;
try {
File file = new File(testWordsFile);
scanner = new Scanner(file);
String str;
//读取每一行拆分文件
while (scanner.hasNextLine()) {
totalCount++;
str = scanner.nextLine();
//计算hash值
int hashCode = str.hashCode();
int fileName = (hashCode & 0x7FFFFFFF) % splitCount;
// 拆分成100个文件
writeToFile(String.valueOf(fileName), str);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
System.out.println("total行数:" + totalCount);
countWord.findTop();
//统一扫描这100个文件的结果
long end = System.currentTimeMillis();
System.out.println("Costs " + (end - start) + "ms");
}
private void findTop() {
//利用最小堆计算每个文件,并保存结果
for (int i = 0; i < splitCount; i++) {
completionService.submit(new FindTop(i + ""));
}
MinHeap minHeap = new MinHeap();
for (int i = 0; i < splitCount; i++) {
try {
Node[] nodes = completionService.take().get();
for (Node node : nodes) {
if (node.getCount() > minHeap.getRoot().getCount()) {
minHeap.setRoot(node);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 打印最终统计结果
for (Node node : minHeap.getNodes()) {
System.out.println(node.getWord() + " " + node.getCount());
}
executorService.shutdown();
}
/**
* Callable of finding top elements <br>
*/
private class FindTop implements Callable<Node[]> {
private String fileName;
public FindTop(String fileName) {
this.fileName = "/Users/huleiwind/Downloads/countWord/" + fileName + ".txt";
}
@Override
public Node[] call() throws Exception {
Map countMap = new HashMap();
MinHeap minHeap = new MinHeap();
try {
FileReader fileReader = new FileReader(fileName);
BufferedReader bufferedReader = new BufferedReader(fileReader);
String str;
while ((str = bufferedReader.readLine()) != null) {
if (countMap.get(str) == null) {
countMap.put(str, "0");
} else {
countMap.put(str, Integer.parseInt(String.valueOf(countMap.get(str))) + 1);
}
}
for (Object key : countMap.keySet()) {
Node node = new Node(String.valueOf(key),
Integer.parseInt(String.valueOf(countMap.get(key))));
Node root = minHeap.getRoot();
// 当数据大于堆中最小的数(根节点)时,替换堆中的根节点,再转换成堆
if (Integer.parseInt(String.valueOf(countMap.get(key))) > root.getCount()) {
minHeap.setRoot(node);
}
}
} catch (FileNotFoundException e1) {
e1.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
return minHeap.getNodes();
}
}
/**
* 追加文件:使用FileWriter
*
* @param fileName
* @param content
*/
private static void writeToFile(String fileName, String content) {
try {
File file = new File("/Users/huleiwind/Downloads/countWord/" + fileName + ".txt");
// 打开一个写文件器,构造函数中的第二个参数true表示以追加形式写文件
FileWriter writer = new FileWriter(file, true);
writer.write(content + "\n");
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 最小堆节点
*/
static class Node {
private String word = "Empty";
private int count;
public Node() {
}
public Node(String word, int count) {
this.word = word;
this.count = count;
}
// ~~~~~~~~~~~~~~~setter getter ~~~~~~~~~~~~~~~~~~~
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
/**
* 最小堆
*/
static class MinHeap {
private Node[] nodes;
// 初始化数组
public MinHeap() {
this.nodes = new Node[minHeapSize];
for (int i = 0; i < minHeapSize; i++) {
nodes[i] = new Node();
}
buildMinHeap();
}
public Node[] getNodes() {
return nodes;
}
public void setNodes(Node[] nodes) {
this.nodes = nodes;
}
/**
* 将数组转成最小堆
*/
private void buildMinHeap() {
// 完全二叉树只有数组下标小于或等于 (data.length) / 2 - 1 的元素有孩子结点,遍历这些结点。,比如上面的图中,数组有10个元素, (data.length) / 2 - 1的值为4,a[4]有孩子结点,但a[5]没有*
for (int i = (nodes.length) / 2 - 1; i >= 0; i--) {
// 对有孩子结点的元素heapify
heapify(i);
}
}
private void heapify(int i) {
// 获取左右结点的数组下标
int l = left(i);
int r = right(i);
// 这是一个临时变量,表示 跟结点、左结点、右结点中最小的值的结点的下标
int smallest = i;
// 存在左结点,且左结点的值小于根结点的值
if (l < nodes.length && nodes[l].count < nodes[i].count) {
smallest = l;
}
// 存在右结点,且右结点的值小于以上比较的较小值
if (r < nodes.length && nodes[r].count < nodes[smallest].count) {
smallest = r;
}
// 左右结点的值都大于根节点,直接return,不做任何操作
if (i == smallest) {
return;
}
// 交换根节点和左右结点中最小的那个值,把根节点的值替换下去
swap(i, smallest);
// 由于替换后左右子树会被影响,所以要对受影响的子树再进行heapify
heapify(smallest);
}
// 获取右结点的数组下标
private int right(int i) {
return (i + 1) << 1;
}
// 获取左结点的数组下标
private int left(int i) {
return ((i + 1) << 1) - 1;
}
// 交换元素位置
private void swap(int i, int j) {
Node tmp = nodes[i];
nodes[i] = nodes[j];
nodes[j] = tmp;
}
// 获取对中的最小的元素,根元素
public Node getRoot() {
return nodes[0];
}
// 替换根元素,并重新heapify
public void setRoot(Node root) {
nodes[0] = root;
heapify(0);
}
}
}