一亿条数据的排序处理

时间:2022-07-25 23:31:58

假设场景:

某大型网站,活跃用户上亿个。(当然不是指同时在线人数,这里指的是再一段时间内有访问操作的用户数量,比如一个小时内)。

现在要每隔1小时,统计一次活跃用户排行榜(用户点击本网站的一个连接,活跃度就加1,按活跃度进行排名)。


首先,在此场景下,解决此问题不涉及数据库操作(也不可能用户点击一下,就更新一下数据库!),访问记录就是记录在日志文件中,例如:

zhangsan, http://a.com/b/

zhangsan, http://a.com/c/

lisi, http://a.com/b/

lisi, http://a.com/e/

lisi, http://a.com/x/


然后,我们不考虑用户访问量的统计过程,假设根据日志文件已经得出了这样的文件:

zhangsan 2

lisi 3

其中,2、3分别表示对应用户的活跃度,我们要按此进行排序,但是用户总量有一亿个!


接着,我们继续抽象、简化。既然活跃度用整数表示,我们就单独来考虑整数排序的问题,即,用户名也先不考虑了,就处理一亿个整数的排序。


先尝试直接使用TreeSet来排序。


TreeSet底层是红黑树实现的,排序是很高效的,这里不深究,我们就用它来完成排序:

1. 生产测试数据

package com.bebebird.data.handler;

import java.io.File;
import java.io.PrintWriter;
import java.util.Random;

/**
*
* @author sundeveloper
*
* 创建测试数据
*
*/
public class DataProducer {

/**
* 创建数据
* @param count 数据量
* @param out 输出文件路径
*/
public static void produce(int count, String out) {
long t1 = System.currentTimeMillis();
File file = new File(out);
if(file.exists())
file.delete();

try (PrintWriter writer = new PrintWriter(file, "UTF-8");) {
Random random = new Random();
for(int i=0; i<count; i++){
writer.write(random.nextInt(count) + "\n");
}
}catch (Exception e){
e.printStackTrace();
}
long t2 = System.currentTimeMillis();
System.out.println("创建成功!耗时:" + (t2 - t1) + "毫秒。");
}

}

调用produce()方法,指定数据量和数据输出路径,来生产测试数据。


2. 利用TreeSet对数据进行排序:

package com.bebebird.data.handler;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.TreeSet;

/**
*
* @author sundeveloper
*
* 使用TreeSet自动将数据排序
*
* 处理数据量能达到千万级,一千万数据排序大约用时20秒。
*
*/
public class SimpleTreeSetHandler {

private Integer[] datas = null;

/**
* 排序
* @param in 数据文件路径
*/
public void sort(String in){
long t1 = System.currentTimeMillis();
File file = new File(in);
if(!file.exists())
return;

try(BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file),"UTF-8"));){
TreeSet<Integer> set = new TreeSet<>();
String line = null;
while((line = reader.readLine()) != null && !"".equals(line)){
set.add(new Integer(line));
}
this.datas = set.toArray(new Integer[set.size()]);
}catch(Exception e){
e.printStackTrace();
}
long t2 = System.currentTimeMillis();

System.out.println("排序完成!耗时:" + (t2 - t1) + "毫秒。");
}

/**
* 从pos开始,获取count个数
* @param pos
* @param count
* @return
*/
public Integer[] limit(int pos, int count){
long t1 = System.currentTimeMillis();
if(pos < 0 || count <= 0){
return null;
}
Integer[] result = new Integer[count];
for (int i = 0; i < count && pos + i < this.datas.length; i++) {
result[i] = this.datas[pos + i];
}
long t2 = System.currentTimeMillis();
System.out.println("取数成功!耗时:" + (t2 - t1) + "毫秒。");
return result;
}

// 测试:
// 创建1千万随机数,进行排序
public static void main(String[] args) {
DataProducer.produce(10000000, "data");

SimpleTreeSetHandler handler = new SimpleTreeSetHandler();
handler.sort("data");

Integer[] limit = handler.limit(10, 10);
System.out.println(Arrays.asList(limit));
}
}

调用SimpleTreeSetHandler的sort()方法,指定数据文件路径,对其排序。


经测试,直接使用TreeSet来处理,一千万数据量很轻松就能处理,大概排序耗时20秒左右。

但是,一亿数据量时就废了!CPU满,内存占用上2.5G左右,并且N多分钟后不出结果,只能结束进程!(有条件的话,可以试试,具体多久能排出来)

机器配置简要说明:2.5 GHz Intel Core i5,系统内存10G。


3. 既然用TreeSet处理一千万数据很容易,那么把一亿条分成10个一千万不就能够处理了?每个一千万用时20秒,10个一千万大概200秒,三分钟拍出来还是可以接受的!(当然,这么算不准确,但大概是这个数量级的!)

package com.bebebird.data.handler;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

/**
*
* @author sundeveloper
*
* 将数据进行分成若干片段;
* 分别对每个片段进行排序,存入临时文件;
* 将临时文件进行合并
*
*/
public class DivideTreeSetHandler {

/**
* 排序
* @param in 数据文件路径
* @param size 每个数据文件的大小(行数)
*/
public List<String> divide(String in, int size){
long t1 = System.currentTimeMillis();
File file = new File(in);
if(!file.exists())
return null;

List<String> outs = new ArrayList<String>();

try(BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file),"UTF-8"));){
int fileNo = 0; // 临时文件编号
Set<Integer> set = new TreeSet<Integer>();
while(true){
String line = reader.readLine();

// 读取结束!
if(line == null){
writeSetToTmpFile(set, fileNo, outs);
break;
}

// 空行,跳过
if("".equals(line.trim())){
continue;
}

set.add(new Integer(line));

// 数据量达到:
if(set.size() >= size){
writeSetToTmpFile(set, fileNo, outs);
fileNo ++;
}
}

}catch(Exception e){
e.printStackTrace();
}
long t2 = System.currentTimeMillis();

System.out.println("拆分完成!耗时:" + (t2 - t1) + "毫秒。");

return outs;
}

// set数据写入到文件中:
private void writeSetToTmpFile(Set<Integer> set, int fileNo, List<String> outs) {
long t1 = System.currentTimeMillis();
File file = new File("tmp_" + fileNo);
if(file.exists())
file.delete();

try (PrintWriter writer = new PrintWriter(file, "UTF-8");) {
Iterator<Integer> iterator = set.iterator();
while(iterator.hasNext()){
writer.write(iterator.next() + "\n");
}
set.clear();
}catch (Exception e){
e.printStackTrace();
}
long t2 = System.currentTimeMillis();
System.out.println("生成临时文件:" + file.getAbsolutePath() + "!耗时:" + (t2 - t1) + "毫秒。");
outs.add(file.getAbsolutePath());
}

/**
* 合并数据
* @param ins
*/
public String combine(List<String> ins) {
long t1 = System.currentTimeMillis();

if(ins == null || ins.size() <= 1)
return null;

File file = new File("tmp");
if(file.exists())
file.delete();

try(PrintWriter writer = new PrintWriter(file, "UTF-8");){
List<BufferedReader> readers = new ArrayList<>();
for (String in : ins) {
readers.add(new BufferedReader(new InputStreamReader(new FileInputStream(in),"UTF-8")));
}

while(readers.size() > 0){
BufferedReader reader0 = readers.get(0);
while(true){
String line = reader0.readLine();
if(line == null){
readers.remove(0);
break;
}
if("".equals(line.trim()))
continue;

// 用个set记录从多个文件中取出的数据,这些数据需要继续排序:
Set<Integer> set = new TreeSet<Integer>();

int data = new Integer(line);

// 先把data放入set:
set.add(data);

for(int i = readers.size() - 1; i > 0; i--){
BufferedReader readeri = readers.get(i);
while(true){
// 设置一个标记,如果后边datai大于data了,需要reset到此处!
readeri.mark(1024);

String linei = readeri.readLine();
if(linei == null){
readers.remove(i);
break;
}
if("".equals(linei.trim()))
continue;

int datai = new Integer(linei);

// datai小于data,则把datai放入set,会自动排序
if(datai < data){
set.add(datai);
}
// datai等于data,则暂时退出,停止读取
else if(datai == data){
break;
}
// datai大于data,则往回退一行(退到标记处),停止读取
else{
readeri.reset();
break;
}
}
}

// 按data查找,小于data的值,都已经存入set了,此时把set输出到文件中:
Iterator<Integer> iterator = set.iterator();
while(iterator.hasNext()){
writer.write(iterator.next() + "\n");
}
set.clear();
}
}
}catch(Exception e){
e.printStackTrace();
}

long t2 = System.currentTimeMillis();
System.out.println("合并完成!耗时:" + (t2 - t1) + "毫秒。");

return file.getAbsolutePath();
}

/**
* 从pos开始,获取count个数
* @param pos
* @param count
* @return
*/
public Integer[] limit(int pos, int count, String in){
// TODO : 从排序后的文件中读取数据即可!不写了!
return null;
}

// 测试:
public static void main(String[] args) {
// 数据量:
int dataCount = 100000000;
// 分页数(拆分文件数):
int pageCount = 10;
// 每页数据量:
int perPageCount = dataCount / pageCount;

// 生成一亿数据:
DataProducer.produce(dataCount, "data");

DivideTreeSetHandler handler = new DivideTreeSetHandler();

// 拆分排序:
List<String> tmps = handler.divide("data", perPageCount);

// 合并排序:
String tmp = handler.combine(tmps);

// 获取数据:
Integer[] limit = handler.limit(10, 10, tmp);
}

}

调用DivideTreeSetHandler的divide()方法,指定数据文件、拆分的每页放多少数据,将数据拆分。当然,拆分的时候就已经分别使用TreeSet排序了!

调用DivideTreeSetHandler的combine()方法,将拆分后的若干个文件进行合并,合并的过程中同样也会排序!

最终,输出一个完全排序了的文件。


经测试,一亿数据量,拆分加合并共用时约3.6分钟(包含各种IO操作的用时),可以接受。


到这里,核心问题解决了,剩余的就是对象排序了,把用户、活跃度封装成对象,用TreeSet将对象进行排序,对象实现compareTo,重写hashcode、equals等等,就不再多说了。



当然,DivideTreeSetHandler的还有很多优化空间,比如,可以把拆分、合并用多线程来处理。这里就先不搞了,有空再说。



说明:

写代码时,并不知道这种排序算法已经有名字了(叫“归并排序”),还想着为其命名呢~

实际上,是受到hadoop的map-reduce思想的启发,想到用这个方法来处理。

思想都是想通的:一个人搞不了了,就要分而治之!