作者:京东物流 秦彪
1. 引言
排序是一个Java开发者,在日常开发过程中随处可见的开发内容,Java中有丰富的API可以调用使用。在Java语言中,作为集合工具类的排序方法,必定要做到通用、高效、实用这几点特征。使用什么样排序算法会比较合适,能够做到在尽量降低时间、空间复杂度的情况下,又要兼顾保证稳定性,达到优秀的性能。可能从性能角度出发首先会想到的是快速排序,或者归并排序。作为jdk提供的通用排序功能,使用又如此频繁,肯定有独特之处,一起来看学习下期中的奥秘。
文中不会过多的介绍几大基本排序算法的方式、由来和思想,主要精力集中在一块探讨java中排序方法所使用的算法,以及那些是值得我们学习和借鉴的内容。文中如有理解和介绍的错误,一起学习,一起探讨,一起进步。
2. 案例
日常使用最为频繁的排序,莫过于如下代码案例,给定一个现有的序列进行一定规则下的排序,配合java8的stream特性,可以很方便的对一个集合进行排序操作(排序规则只是对排序对象及排序方案的限定,不在本文讨论范围内)。
List<Integer> list = Arrays.asList(10, 50, 5, 14, 16, 80);
System.out.println(list.stream().sorted().collect(Collectors.toList()));
在代码执行的过程中SortedOps.java类中 Arrays.sort(array, 0, offset, comparator); 执行了Array集合类型的sort排序算法。
@Override
public void end() {
Arrays.sort(array, 0, offset, comparator);
downstream.begin(offset);
if (!cancellationWasRequested) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
downstream.accept(array[i]);
}
downstream.end();
array = null;
}
如果使用Collections.sort() 方法如下打印 list1 和 list2 结果一样,且调用的都是 Arrays 集合类中的 sort 方法。
List<Integer> list1 = Arrays.asList(10, 50, 5, 14, 16, 80);
System.out.println(list1.stream().sorted().collect(Collectors.toList()));
List<Integer> list2 = Lists.newArrayList();
list2.addAll(list1);
Collections.sort(list2);
System.out.println(list2);
// 输出:
// [5, 10, 14, 16, 50, 80]
// [5, 10, 14, 16, 50, 80]
2. Collections.sort 方法介绍
Collections类中关于sort方法定义如下:
public static <T extends Comparable<? super T>> void sort(List<T> list) {
list.sort(null);
}
通过该方法注释,查看到有三项值得关注的信息,大概意思是该方法实现了稳定且默认升序排序的功能。
1. Sorts the specified list into ascending order, according to the Comparable natural ordering of its elements.
2. This sort is guaranteed to be stable equal elements will not be reordered as a result of the sort.
3. The specified list must be modifiable, but need not be resizable.
进入sort,代码进入到List类的sort方法,发现方法将入参list先转为了数组Object[],之后利用Arrays.sort进行排序。
default void sort(Comparator<? super E> c) {
Object[] a = this.toArray();
Arrays.sort(a, (Comparator) c);
ListIterator<E> i = this.listIterator();
for (Object e : a) {
i.next();
i.set((E) e);
}
}
首先在这里思考一个问题为什么要转为数组,问题答案已经在方法的英文注释中说明白了。
* The default implementation obtains an array containing all elements in
* this list, sorts the array, and iterates over this list resetting each
* element from the corresponding position in the array. (This avoids the
* n<sup>2</sup> log(n) performance that would result from attempting
* to sort a linked list in place.)
是为了避免直接对List<T>的链表进行排序,从而耗费O(n2logn) 时间复杂度。当然这里在this.toArray()时,为了将list强行变为数组会损失一些性能和空间开销,源码中使用了System.arraycopy调用底层操作系统方法进行数据复制,详细内容可以查看相关实现。 继续进入Arrays类的sort方法定义中,我们没有使用比较器,LegacyMergeSort.userRequested表示进入老的归并排序算法,默认是关闭的,直接进入本文重点关注的TimSort.sort(…)方法。
public static <T> void sort(T[] a, Comparator<? super T> c) {
if (c == null) {
sort(a);
} else {
if (LegacyMergeSort.userRequested)
legacyMergeSort(a, c);
else
TimSort.sort(a, 0, a.length, c, null, 0, 0);
}
}
3. TimSort 算法介绍
Timsort是一个自适应的、混合的、稳定的排序算法,是由Tim Peter于2002年发明的,最早应用在Python中,现在广泛应用于Python、Java、Android 等语言与平台中,作为基础的排序算法使用。其中Java语言的Collection.sort在JDK1.6使用的是普通的归并排序,归并排序虽然时间复杂度低,但是空间复杂度要求较高,所以从JDK1.7开始就更改为了TimSort算法。
Timsort 的时间复杂度是 O(n log n),与归并排序的时间复杂度相同,那它的优势是啥呢,实际上可以认为TimSort排序算法是归并排序算法的优化版,从它的三个特征就可以看出,第二个特征“混合的”,没错,它不单纯是一种算法,而是融合了归并算法和二分插入排序算法的精髓,因此能够在排序性能上表现优异。其它两个特征自适应和稳定性会在文章后面讲到。首先从算法性能统计上做个对比:
可以看出TimSort排序算法,平均和最坏时间复杂度是O(nlogn),最好时间复杂度是O(n),空间复杂度是O(n),且稳定的一种排序算法。在稳定算法中,从性能效果上对比来看和二叉排序算法一样。
3.1 TimSort的核心思想
那TimSort算法的核心思想是什么呢,首先原始的TimSort对于长度小于64的数据(java中是32),会直接选择二分插入排序,效率很高。其次,TimSort算法的初衷认为现实中的数据总是部分有序的。这句话很关键,怎么理解呢,比如列表[5, 2, 8, 5, 7,23, 45, 63],里面的[5, 2] 和 [8, 5] 和 [7, 23, 45,63] 各子列表中就是有序的,要么升序要么降序,这就是TimSort的基本根据。
基于此会发现待排序列表已经部分有序了,所以会在排序过程中尽量不要破坏这种顺序,就可以做到减少排序时间消耗。基本思想说完了,由此引出TimSort算法的几个概念:run和minrun。
run是指连续升序或者连续降序的最长子序列(降序和升序可以相互转换),而minrun是一个设定值,实际上是每个run的长度最小值。所以TimSort会对待排序序列进行划分,找出连续有序的子序列,如果子序列长度不满足这点要求,就将后续数据插入到前面的子序列中。
举个例子,待排序序列[5, 2, 8, 5, 7,23, 45, 63], 如果minRun = 3,那分割后的run会有以下:[2, 5, 8]、[5,7,23,45,63] 两个子序列,最终通过合并这两个run得到[2,5,5,7,8,23,45,63]
是不是有个疑问: minrun怎么选择得到的?该值是通过大量的统计计算给出的minrun长度建议是在32 ~ 64之间取值效率比较高,具体在java代码中可能会有所不同。
接着来看,假设现在有序子序列已经拆分好了,需要进入到合并过程中了,TimSort是如何合并子序列的。对于归并排序我们都知道,序列先归后并,两两组合利用一个空数组直接进行比较就合并了。但是在TimSort算法中,合并过程是实时的,每次算出一个run就可能做一次合并。这个过程利用了栈结构,且需要遵循相邻的run才可以合并,也就是只有相邻的栈元素可以进行合并。
规则如下:假设当前有三个run子序列依次入栈,现在栈顶有三个元素从上至下依次为x3、x2、x1,它们的长度只要满足以下两个条件中的任何一个就进行合并:
(1)x1 <= x2 + x3
(2)x1 <= x2
满足这个条件的三个序列,像汉诺塔一样长度由下往上依次减小。刚才提到合并run的过程是实时的,也就是每产生一个run就进行一次合并操作。举例说明下,当前假设待排序序列[2,6,8,4,2,5,7,9,10,11,4,25,64,32,78,99],其中再假设minrun=3是合理的。合并过程是这样的,注意这里的压栈和弹栈不一定需要对子序列本身进行操作,不是真的将子序列放入栈中,而只需要run标识以及长度即可,因为栈元素比较的是run长度。
(1)首先第一个run0是[2,6,8],而第二个run1是[2,4,5],此时依次将其放入栈中,发现满足第二个条件,这两个run进行合并,合并后将旧序列从栈中弹出,得到新的run0是[2,2,4,5,6,8],再次压入栈中。
(2)继续从原序列中找到新的run1是[7,9,10,11],压入栈中,此时run0和run1不满足条件不需要合并。继续从原序列中找到run2是[4,25,64],压入栈中,此时满足第一个条件,这里的run1和run2需要进行合并,合并后将旧序列从栈中弹出,新run1是[4,7,9,10,11,25,64],压入栈中。
(3)此时发现run0和run1满足第二个条件,继续合并弹出旧序列,得到新run0是[2,2,4,4,5,6,7,8,9,10,11,25,64],压入栈中。
(4)继续从原序列中找到新的run1是[32,78,99],压入栈中。此时发现没有更多元素,而条件是不满足的,依然进行一次合并,弹出旧序列,压入合并后的新子序列run0是[2,2,4,4,5,6,7,8,9,10,11,25,32,64,78,99]
(5)此时将run0拷贝到原序列就完成了排序
为什么要设置这么两个合并run的严格条件,直接压栈合并岂不更好?目的是为了避免一个较长的有序片段和一个较小的有序片段进行归并,在合并长度上做到均衡效率才高。
在合并run的过程中会用到一种所谓的gallop(飞奔)模式,能够减少参与归并的数据长度,主要过程如下:假设有待归并的子序列x和y,如果x的前n个元素都是比y首元素小的,那这n个元素实际上就不用参与归并了。原因就是这n个元素本来已经有序了,归并后还是在原来的位置。同理而言,如果y的最后几个元素都比x最后一个元素小,那y的最后这n个元素也就不必参与归并操作了,这样就可以减少归并长度,减少来回复制多余数据的开销。
3.2 Java源码
探讨完TimSort的核心思想及其排序过程,现在来看下java代码是如何实现的。Java1.8中的TimSort类位置在java.util.TimSort
static <T> void sort(T[] a, int lo, int hi, Comparator<? super T> c,
T[] work, int workBase, int workLen) {
assert c != null && a != null && lo >= 0 && lo <= hi && hi <= a.length;
int nRemaining = hi - lo;
if (nRemaining < 2)
return;
if (nRemaining < MIN_MERGE) {
int initRunLen = countRunAndMakeAscending(a, lo, hi, c);
binarySort(a, lo, hi, lo + initRunLen, c);
return;
}
TimSort<T> ts = new TimSort<>(a, c, work, workBase, workLen);
int minRun = minRunLength(nRemaining);
do {
int runLen = countRunAndMakeAscending(a, lo, hi, c);
if (runLen < minRun) {
int force = nRemaining <= minRun ? nRemaining : minRun;
binarySort(a, lo, lo + force, lo + runLen, c);
runLen = force;
}
ts.pushRun(lo, runLen);
ts.mergeCollapse();
lo += runLen;
nRemaining -= runLen;
} while (nRemaining != 0);
assert lo == hi;
ts.mergeForceCollapse();
assert ts.stackSize == 1;
}
变量nRemaining记录的是待排序列表中剩余元素个数, MIN_MERGE就是前文中提到的java中的minrun值是32。如果nRemaining<32,用countRunAndMakeAscending(…)方法得到连续升序的最大个数,里面涉及到升序降序调整。可以看到如果待排序列表小于32长度,就进行二分插入排序binarySort(…)。
如果待排序列表长度大于32,调用TimSort对象的minRunLength(nRemaining) 计算minRun,这里就体现了动态自适应,具体来看代码中是如何做的。r为取出长度n的二进制每次右移的一个溢出位值,n每次右移1位,直到长度n小于32。n+r最终结果就是保留长度n的二进制的高5位再加上1个移除位。根据注释可以看出:
- 如果待排序数组长度为2的n次幂,比如1024,则minRun = 32/2 = 16
- 其它情况的时候,逐位右移,直到找到介于16<=k<=32的值。
假如待排序列表长度是7680,二进制是1111000000000,按照操作后是11110十进制是30,再加上移除位0是30,所以minRun=30
private static int minRunLength(int n) {
assert n >= 0;
int r = 0; // Becomes 1 if any 1 bits are shifted off
while (n >= MIN_MERGE) {
r |= (n & 1);
n >>= 1;
}
return n + r;
}
接下来在循环中进行处理:
(1) 计算最小升序的run长度,如果小于minRun,使用二分插入排序将run的长度补充到minRun要求的长度。
(2) ts.pushRun(lo, runLen) ,通过栈记录每个run的长度,这里lo是run的第一个元素的索引用来标记操作的是哪个run,runLen是run的长度。
private void pushRun(int runBase, int runLen) {
this.runBase[stackSize] = runBase;
this.runLen[stackSize] = runLen;
stackSize++;
}
(3)ts.mergeCollapse(); 通过计算前面提到的两个run合并的限定条件,分别是:
- runLen[n-1] <= runLen[n] + runLen[n+1]
- runLen[n] <= runLen[n + 1]
private void mergeCollapse() {
while (stackSize > 1) {
int n = stackSize - 2;
if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
if (runLen[n - 1] < runLen[n + 1])
n--;
mergeAt(n);
} else if (runLen[n] <= runLen[n + 1]) {
mergeAt(n);
} else {
break; // Invariant is established
}
}
}
(4) 这里的mergeAt(n) 归并排序过程,之前有提到是经过优化后所谓gallop模式的归并排序,具体表现在方法中的gallopRight和gallopLeft方法。
int k = gallopRight(a[base2], a, base1, len1, 0, c);
assert k >= 0;
base1 += k;
len1 -= k;
if (len1 == 0)
return;
len2 = gallopLeft(a[base1 + len1 - 1], a, base2, len2, len2 - 1, c);
assert len2 >= 0;
if (len2 == 0)
return;
假设有X序列[4,9,21,23], Y序列[5,7,12,13,14,15,17],由于X中4小于Y序列最小元素5,所以合并后4必然是第一个元素;而Y序列中尾元素17比X中的[21,23]小,所以X中的[21,23]必然是合并最后两元素。
4. DivalQuickSort 算法介绍
前文案例中提到SortedOps.java类,该类中对于基本类型的排序调用 Arrays.sort(ints); 或 Arrays.sort(longs); 再或 Arrays.sort(doubles); 使用了DivalQuickSort排序算法。
public static void sort(int[] a) {
DualPivotQuicksort.sort(a, 0, a.length - 1, null, 0, 0);
}
4.1 DivalQuickSort 核心思想
快速排序使用的是在排序序列中选择一个数作为分区点pivot,也就是所谓的轴,然后以此轴将数据分为左右两部分,大于该值的为一个区,小于该值的为一个区,利用分治和递归思想实现。如下假设选择19为pivot值。
双轴快排,如其名字所示,就是会选取两个数作为pivot,这样就会划分出三个区间,实际在执行中会有4个区间,分别是小于等于pivot1区间;pivot1和pivot2之间区间,待处理区间; 大于等于pivot2区间。如下假设选择10和19为两个pivot值。
每次递归迭代时遍历待处理的区域数据,然后比较它应该放的位置,并进行交换操作,逐渐压缩待处理区域的数据长度,处理掉待处理区域的元素数据;执行完毕一轮数据后交换pivot数值,然后各自区间再进行递归排序即可。
4.2 Java源码
static void sort(int[] a, int left, int right,
int[] work, int workBase, int workLen) {
if (right - left < QUICKSORT_THRESHOLD) {
sort(a, left, right, true);
return;
}
int[] run = new int[MAX_RUN_COUNT + 1];
int count = 0; run[0] = left;
for (int k = left; k < right; run[count] = k) {
if (a[k] < a[k + 1]) { // ascending
while (++k <= right && a[k - 1] <= a[k]);
} else if (a[k] > a[k + 1]) { // descending
while (++k <= right && a[k - 1] >= a[k]);
for (int lo = run[count] - 1, hi = k; ++lo < --hi; ) {
int t = a[lo]; a[lo] = a[hi]; a[hi] = t;
}
} else { // equal
for (int m = MAX_RUN_LENGTH; ++k <= right && a[k - 1] == a[k]; ) {
if (--m == 0) {
sort(a, left, right, true);
return;
}
}
}
if (++count == MAX_RUN_COUNT) {
sort(a, left, right, true);
return;
}
}
if (run[count] == right++) {
run[++count] = right;
} else if (count == 1) {
return;
}
byte odd = 0;
for (int n = 1; (n <<= 1) < count; odd ^= 1);
int[] b;
int ao, bo;
int blen = right - left;
if (work == null || workLen < blen || workBase + blen > work.length) {
work = new int[blen];
workBase = 0;
}
if (odd == 0) {
System.arraycopy(a, left, work, workBase, blen);
b = a;
bo = 0;
a = work;
ao = workBase - left;
} else {
b = work;
ao = 0;
bo = workBase - left;
}
for (int last; count > 1; count = last) {
for (int k = (last = 0) + 2; k <= count; k += 2) {
int hi = run[k], mi = run[k - 1];
for (int i = run[k - 2], p = i, q = mi; i < hi; ++i) {
if (q >= hi || p < mi && a[p + ao] <= a[q + ao]) {
b[i + bo] = a[p++ + ao];
} else {
b[i + bo] = a[q++ + ao];
}
}
run[++last] = hi;
}
if ((count & 1) != 0) {
for (int i = right, lo = run[count - 1]; --i >= lo;
b[i + bo] = a[i + ao]
);
run[++last] = right;
}
int[] t = a; a = b; b = t;
int o = ao; ao = bo; bo = o;
}
}
通过看代码可以看出,java中的双轴快排在片段数据长度及一定条件的情况下,还使用了其它诸如归并、插入等排序算法。
由于DivalQuickSort算法实现内容比较复杂,文中重点讲解了TimSort算法,待笔者研究透彻后进行补充。
5. 相同环境排序时间对比
想要真正模拟一模一样运行环境,是困难的。这里只是模拟相同的数据集,在相同机器上,其实就是平时办公的机器,统计不同排序算法排序过程中耗费的时间,这里的结果仅供参考。
模拟数据:随机得到1亿个范围在0到100000000内的整型元素构成数组,分别基于快速排序、普通归并排序、TimSort的排序算法得到耗时结果如下,单位ms。
验证计划 |
快排 |
普通归并 |
双轴快排 |
第1轮 |
26972 |
26247 |
15151 |
第2轮 |
20850 |
32457 |
17811 |
第3轮 |
20848 |
25437 |
14957 |
第4轮 |
17066 |
26542 |
16429 |
第5轮 |
19264 |
34206 |
|
第6轮~第95轮 |
… |
… |
|
第96轮 |
19433 |
28887 |
18681 |
第97轮 |
18670 |
27421 |
16705 |
第98轮 |
19084 |
28301 |
17862 |
第99轮 |
18645 |
27177 |
16300 |
第100轮 |
19004 |
27653 |
16473 |
平均结果 |
19232 |
27760 |
16867 |
通过测试验证结果来看,在当前数据集规模下,双轴快排DivalQuickSort表现优异。注:Java中TimSort主要运用引用类型的集合排序中,本次数据验证并未加入比较。
5. 总结与探讨
由于Java提供了很方便的排序API,所以在平时的需求使用过程中一般都是短短几行代码调用使用完整排序工作,这也是Java作为一门流行语言最基本的职责所在。当然也会导致我们开发者容易忽视其原理,不能够学习到里面的精髓。
文中一起了解学习了TimSort算法和DivalQuickSort的排序思想与java实现。作为基本排序实现被广泛的应用,肯定有其值得学习与借鉴的地方。可以得知工业用排序算法,通常都不是一种算法,而是根据特定条件下的多种算法混合而成,实际上平时很多使用的经典数据结构都不是一种类型或者一种方式,比如HashMap中随着数据量大小有链表与红黑树的转化,再比如Redis中的各种数据结构不都是一种实现。这些经典优秀的实现都用到了诸如此类的思想。