I have a 2D array, where each row contains 6 integers that are already sorted in ascending order. Example:
我有一个2D数组,其中每行包含6个已按升序排序的整数。例:
1 2 3 4 5 6
6 8 9 10 13 15
1 4 5 6 7 9
1 4 5 6 7 8
3 18 19 20 25 34
Expected output:
1 2 3 4 5 6
1 4 5 6 7 8
1 4 5 6 7 9
3 18 19 20 25 34
6 8 9 10 13 15
The actual data contains anywhere from 8m to 33m records like this. I'm trying to determine the fastest way to sort this array. I currently have some working code using qsort:
实际数据包含8米到33米这样的记录。我正在尝试确定对此数组进行排序的最快方法。我目前使用qsort有一些工作代码:
qsort call:
qsort(allRecords, lineCount, sizeof(int*), cmpfunc);
cmpfunc:
int cmpfunc (const void * a, const void * b)
{
const int *rowA = *(const int **)a;
const int *rowB = *(const int **)b;
if (rowA[0] > rowB[0]) return 1;
if (rowA[0] < rowB[0]) return -1;
if (rowA[1] > rowB[1]) return 1;
if (rowA[1] < rowB[1]) return -1;
if (rowA[2] > rowB[2]) return 1;
if (rowA[2] < rowB[2]) return -1;
if (rowA[3] > rowB[3]) return 1;
if (rowA[3] < rowB[3]) return -1;
if (rowA[4] > rowB[4]) return 1;
if (rowA[4] < rowB[4]) return -1;
if (rowA[5] > rowB[5]) return 1;
if (rowA[5] < rowB[5]) return -1;
return 0;
}
For the sample 33 million records, it takes about 35.6 seconds (gcc -O1), which is pretty fast, but I'm wondering if there's any faster way to do it given the pre-sorted values in each row.
对于样本3300万条记录,它需要大约35.6秒(gcc -O1),这非常快,但我想知道是否有更快的方法来执行它,因为每行中有预先排序的值。
This naturally lends to data where the most common first digit is 1, so in a 33m record file, there might be 12m records starting with 1, then 8m records starting with 2, 5m records starting with 3, etc... I'm not sure if this would lend itself to one particular type of sorting over another (e.g. heapsort).
这自然适用于最常见的第一个数字为1的数据,因此在33m记录文件中,可能有12m记录以1开头,然后8m记录以2开头,5m记录以3开头等等...我是不确定这是否适合某种特定类型的排序而不是另一种(例如heapsort)。
My understand is that qsort has a fair amount of overhead because of all the times it has to call the function, so I'm hoping for some even faster performance.
我的理解是qsort有相当大的开销,因为它必须调用函数,所以我希望有一些更快的性能。
I'm not usually writing C code, so I'm very open to suggestions and criticism, since I'm piecing this all together from tutorials and other * questions/answers.
我通常不会编写C代码,因此我对建议和批评非常开放,因为我正在将这些内容与教程和其他*问题/答案拼凑在一起。
EDIT: As requested, my initialization code:
编辑:根据要求,我的初始化代码:
// Empty record
int recArray[6] = {0,0,0,0,0,0};
// Initialize allRecords
int** allRecords;
allRecords = (int**) malloc(lineCount*sizeof(int*));
for(i=0; i < lineCount; i++)
{
allRecords[i] = (int*) malloc(6*sizeof(int));
}
// Zero-out all records
for(i=0; i < lineCount; i++)
{
memcpy(allRecords[i], recArray, 6 * sizeof(int));
}
I'm still learning the right way to do stuff, so I wouldn't be surprised if I was doing it all wrong. Guidance in doing it right would be appreciated.
我还在学习正确的做事方式,所以如果我做错了,我也不会感到惊讶。这样做的指导意见将不胜感激。
Someone else asked about the range of values - I'm not sure if the range will change in the future, but at the current moment, the values are between 1 and 99.
其他人询问了价值范围 - 我不确定未来范围是否会发生变化,但目前这一数值介于1和99之间。
Also, for profiling - I built a small function that uses gettimeofday() to pull seconds/microseconds, and then compare before and after. I'm open to better ways. The output looks like:
另外,对于分析 - 我构建了一个小函数,它使用gettimeofday()来拉秒/微秒,然后比较之前和之后。我对更好的方式持开放态度。输出如下:
// <-- Here I capture the gettimeofday() structure output
Sorting...
Sorted.
Time Taken: 35.628882s // <-- Capture it again, show the difference
EDIT: Per @doynax - I now "pack" the 6 values of each line into an unsigned long long int:
编辑:Per @doynax - 我现在将每行的6个值“打包”到unsigned long long int中:
// Initialize allRecords
unsigned long long int* allRecords;
allRecords = (unsigned long long int*) malloc(lineCount*sizeof(unsigned long long int));
for(i=0; i < lineCount; i++)
{
allRecords[i] = 0;
}
...
// "Pack" current value (n0) into an unsigned long long int
if(recPos == 0) { lineSum += n0 * UINT64_C(1); }
else if(recPos == 1) { lineSum += n0 * UINT64_C(100); }
else if(recPos == 2) { lineSum += n0 * UINT64_C(10000); }
else if(recPos == 3) { lineSum += n0 * UINT64_C(1000000); }
else if(recPos == 4) { lineSum += n0 * UINT64_C(100000000); }
else if(recPos == 5) { lineSum += n0 * UINT64_C(10000000000); }
...
allRecords[linecount] = lineSum;
lineSum = 0;
I can also later "unpack" one of these unsigned long long int values successfully back into the original 6 ints.
我也可以稍后将这些无符号long long int值中的一个“解包”成原始的6个整数。
However, when I try to sort:
但是,当我尝试排序时:
qsort(allRecords, lineCount, sizeof(unsigned long long int), cmpfunc);
...
int cmpfunc (const void * a, const void * b)
{
if (*(unsigned long long int*)a > *(unsigned long long int*)b) return 1;
if (*(unsigned long long int*)a < *(unsigned long long int*)b) return -1;
return 0;
}
...the results aren't sorted as expected. If I show the first and last lines before and after sorting using this:
...结果未按预期排序。如果我使用以下方法显示排序前后的第一行和最后一行:
printf("[%i] = %llu = %i,%i,%i,%i,%i,%i\n", j, lineSum, recArray[0]...recArray[5]);
The output is:
输出是:
First and last 5 rows before sorting:
[#] = PACKED INT64 = UNPACKED
[0] = 462220191706 = 6,17,19,20,22,46
[1] = 494140341005 = 5,10,34,40,41,49
[2] = 575337201905 = 5,19,20,37,53,57
[3] = 504236262316 = 16,23,26,36,42,50
[4] = 534730201912 = 12,19,20,30,47,53
[46] = 595648302516 = 16,25,30,48,56,59
[47] = 453635251108 = 8,11,25,35,36,45
[48] = 403221161202 = 2,12,16,21,32,40
[49] = 443736310604 = 4,6,31,36,37,44
[50] = 575248312821 = 21,28,31,48,52,57
First and last 5 rows after sorting:
[0] = 403221161202 = 2,12,16,21,32,40
[1] = 413218141002 = 2,10,14,18,32,41
[2] = 443736310604 = 4,6,31,36,37,44
[3] = 444127211604 = 4,16,21,27,41,44
[4] = 453028070302 = 2,3,7,28,30,45
[46] = 585043260907 = 7,9,26,43,50,58
[47] = 593524170902 = 2,9,17,24,35,59
[48] = 595248392711 = 11,27,39,48,52,59
[49] = 595251272612 = 12,26,27,51,52,59
[50] = 595648302516 = 16,25,30,48,56,59
I'm guessing I'm somehow comparing the wrong values (e.g. pointer values instead of the actual values), but I'm not quite sure what the right syntax is.
我猜我在某种程度上比较了错误的值(例如指针值而不是实际值),但我不太清楚正确的语法是什么。
On the plus side, it's blazing fast this way. :)
从好的方面来说,这种方式很快。 :)
Sorting 33m 64-bit ints takes about 4-5 seconds (at least in its current, mistaken form).
对33m 64位整数进行排序大约需要4-5秒(至少在其当前的错误形式中)。
2 个解决方案
#1
3
I can't resist a good optimization challenge.
我无法抗拒好的优化挑战。
Initial Thoughts
My first thought in seeing the problem was to use a radix sort, specifically a radix sort with an internal counting sort. (And it's nice to see that some of the comments agreed with me, too!) I've used radix sort with internal counting sort in other projects, and while quicksort can outperform it for small datasets, it leaves quicksort in the dust for large datasets.
我首先想到的问题是使用基数排序,特别是带有内部计数排序的基数排序。 (并且很高兴看到一些评论也同意我的意见!)我在其他项目中使用了基数排序和内部计数排序,而快速排序可以在小数据集中超越它,它可以快速排除大量数据数据集。
I went for radix sort because comparison sorts like quicksort have a well-known performance limit of O(n lg n). Because n is pretty big here, that "lg n" part is also pretty big — for 33 million records, you can reasonably expect it will take "some constant time" times 33000000 times lg(33000000), and lg(33000000) is about 25. Sorts like radix sort and counting sort are "non-comparing" sorts, so they can cut under that O(n lg n) boundary by having special knowledge of the data — in this case, they can go much faster because we know the data is small integers.
我选择了基数排序,因为像quicksort这样的比较排序具有众所周知的O(n lg n)性能限制。因为n在这里相当大,“lg n”部分也相当大 - 对于3300万条记录,你可以合理地预期它将需要“一些恒定时间”时间3300万次lg(33000000),并且lg(33000000)是关于25.像基数排序和计数排序这样的排序是“非比较”排序,因此他们可以通过对数据的特殊了解来削减O(n lg n)边界 - 在这种情况下,它们可以更快,因为我们知道数据是小整数。
I used the same mash-the-values-together code that everybody else did, combining each set of int
values into a single BigInt
, but I went for bit-shifts and masking rather than multiplication and modulus, since bit operations tend to be faster overall.
我使用了与其他人一样的mash-the-values-together代码,将每组int值组合到一个BigInt中,但我选择了位移和屏蔽而不是乘法和模数,因为位操作往往更快总体。
My implementation (see below) performs some constant-time setup computations, and then it iterates over the data exactly 7 times to sort it — and "7 times" not only beats the pants off "25 times" algorithmically, but my implementation is careful to avoid touching the memory more often than necessary so that it plays as well as it can with CPU caches too.
我的实现(见下文)执行一些恒定时间的设置计算,然后它将数据完全重复7次以对其进行排序 - 并且“7次”不仅在算法上击败了“25次”,而且我的实现很谨慎避免更频繁地触摸内存,以便它也可以与CPU缓存一起播放。
For reasonable source data, I wrote a little program that generated 33 million records that looked vaguely like your originals. Each row is in sorted order, and slightly biased toward smaller values. (You can find my sixsample.c
sample-dataset program below.)
对于合理的源数据,我写了一个小程序,生成了3300万条记录,看起来有点像你的原件。每行按排序顺序排列,略微偏向较小的值。 (您可以在下面找到我的sixsample.c样本数据集程序。)
Performance Comparison
Using something very close to your qsort()
implementation, I got these numbers over three runs, and they aren't too far off from what you were getting (different CPU, obviously):
使用非常接近你的qsort()实现的东西,我在三次运行中获得了这些数字,并且它们与你得到的相差不远(显然不同的CPU):
qsort:
Got 33000000 lines. Sorting.
Sorted lines in 5 seconds and 288636 microseconds.
Sorted lines in 5 seconds and 415553 microseconds.
Sorted lines in 5 seconds and 242454 microseconds.
Using my radix-sort-with-counting-sort implementation, I got these numbers over three runs:
使用我的radix-sort-with-counting-sort实现,我在三次运行中获得了这些数字:
Radix sort with internal counting sort, and cache-optimized:
Got 33000000 lines. Sorting.
Sorted lines in 0 seconds and 749285 microseconds.
Sorted lines in 0 seconds and 833474 microseconds.
Sorted lines in 0 seconds and 761943 microseconds.
5.318 seconds vs 0.781 seconds to sort the same 33 million records. That's 6.8x faster!
5.318秒vs 0.781秒对同样的3300万条记录进行排序。这快6.8倍!
Just to be sure, I diff
ed the output from each run to guarantee the results were the same — you really can sort tens of millions of records in under a second!
可以肯定的是,我将每次运行的输出分开以保证结果是相同的 - 你真的可以在一秒钟内排序数千万条记录!
Deeper Dive
The basic principles of a radix sort and a counting sort are described well on Wikipedia, and in one of my favorite books, Introduction to Algorithms, so I won't repeat the basics here. Instead, I'll describe a bit about how my implementation differs from the textbook forms to run a bit faster.
基础排序和计数排序的基本原理在*上有很好的描述,在我最喜欢的书之一,算法导论中,所以我不再重复这里的基础知识。相反,我将稍微介绍一下我的实现与教科书表单的不同之处,以便更快地运行。
The normal implementation of these algorithms is something like this (in Python-esque pseudocode):
这些算法的正常实现是这样的(在Python-esque伪代码中):
def radix-sort(records):
temp = []
for column = 0 to columns:
counting-sort(records, temp, column)
copy 'temp' back into 'records'
def counting-sort(records, dest, column):
# Count up how many of each value there is at this column.
counts = []
foreach record in records:
counts[record[column]] += 1
transform 'counts' into offsets
copy from 'records' to 'dest' using the calculated offsets
This certainly works, but it comes with a few uglinesses. The "count up the values" step involves counting values in the whole dataset for every column. The "copy temp back into records" step involves copying the whole dataset for every column column. And those are in addition to the required step of "copy from records-to-dest in sorted order". We spin over the data three times for every column; it would be nice to do that less often!
这肯定有效,但它带有一些uglinesses。 “计算值”步骤涉及计算每列的整个数据集中的值。 “将温度复制回记录”步骤涉及复制每个列列的整个数据集。除了“按照排序顺序从记录到目录复制”所需的步骤之外,还有这些步骤。我们为每列旋转数据三次;不那么经常这样做会很好!
My implementation mixes these together: Rather than calculating the counts for each column separately, they're calculated together as a first pass over the data. Then the counts are transformed in bulk into offsets; and finally, the data is sorted into place. In addition, rather than copying from temp
to records
at each stage, I simply page-flip: The data alternates where it's "coming from" every other iteration. My pseudocode is something more like this:
我的实现将这些混合在一起:它们不是分别计算每列的计数,而是作为数据的第一遍计算在一起。然后将计数大量转换为偏移量;最后,数据被分类到位。另外,我不是在每个阶段从temp复制到记录,而是简单地翻页:数据交替出现在每隔一次迭代“来自”的地方。我的伪代码更像是这样的:
def radix-sort(records):
# Count up how many of each value there is in every column,
# reading each record only once.
counts = [,]
foreach record in records:
foreach column in columns:
counts[column, record[column]] += 1
transform 'counts' for each column into offsets for each column
temp = []
for column = 0 to columns step 2:
sort-by-counts(records, temp, column)
sort-by-counts(temp, records, column+1)
def sort-by-counts(records, dest, counts, column):
copy from 'records' to 'dest' using the calculated offsets
This avoids spinning over the data any more than necessary: One pass for setup, and six passes to sort it, all in order. You can't make the CPU cache much happier.
这样可以避免超出必要的数据:一次通过设置,六次通过排序,所有这些都按顺序排列。你不能让CPU缓存更快乐。
The C Code
C代码
Here's my complete implementation of sixsort.c
. It includes your qsort
solution as well, commented-out, so it's easier for you to compare the two. It includes lots of documentation, and all of the I/O and metrics code, so it's somewhat long, but its completeness should help you to understand the solution better:
这是我对sixsort.c的完整实现。它还包括你的qsort解决方案,注释掉了,所以你更容易比较两者。它包含大量文档以及所有I / O和指标代码,所以它有点长,但它的完整性应该可以帮助您更好地理解解决方案:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <sys/time.h>
/* Configuration. */
#define NUM_COLUMNS 6 /* How many columns to read/sort */
#define MAX_VALUE 100 /* Maximum value allowed in each column, plus one */
#define BITSHIFT 7 /* How far to shift bits when combining values */
/* Note: BITSHIFT should be as small as possible, but MAX_VALUE must be <= 2**BITSHIFT. */
/* Highest value representable under the given BITSHIFT. */
#define BITMASK ((1 << BITSHIFT) - 1)
/* The type we're going to pack the integers into. The highest bits will get the
leftmost number, while the lowest bits will get the rightmost number. */
typedef unsigned long long BigInt;
/*-------------------------------------------------------------------------------------------------
** Radix Sorting.
*/
/* Move a set of items from src to dest, using the provided counts to decide where each
item belongs. src and dest must not be the same arrays. */
static void SortByCounts(BigInt *src, BigInt *dest, size_t *counts, size_t totalCount, int bitshift)
{
BigInt *temp, *end;
for (temp = src, end = src + totalCount; temp < end; temp++) {
BigInt value = *temp;
int number = (int)((value >> bitshift) & BITMASK);
int offset = counts[number]++;
dest[offset] = value;
}
}
/* Use a radix sort with an internal counting sort to sort the given array of values.
This iterates over the source data exactly 7 times, which for large
'count' (i.e., count > ~2000) is typically much more efficient than O(n lg n)
algorithms like quicksort or mergesort or heapsort. (That said, the recursive O(n lg n)
algorithms typically have better memory locality, so which solution wins overall
may vary depending on your CPU and memory system.) */
static void RadixSortWithInternalCountingSort(BigInt *values, size_t count)
{
size_t i, j;
BigInt *temp, *end;
size_t counts[NUM_COLUMNS][MAX_VALUE];
size_t oldCount;
/* Reset the counts of each value in each column to zero, quickly.
This takes MAX_VALUE * NUM_COLUMNS time (i.e., constant time). */
memset(counts, 0, sizeof(counts));
/* Count up how many there are of each value in this column. This iterates over
the whole dataset exactly once, processing each set of values in full, so it
takes COUNT * NUM_COLUMNS operations, or theta(n). */
for (temp = values, end = values + count; temp < end; temp++) {
BigInt value = *temp;
for (i = 0; i < NUM_COLUMNS; i++) {
counts[i][(int)((value >> (i * BITSHIFT)) & BITMASK)]++;
}
}
/* Transform the counts into offsets. This only transforms the counts array,
so it takes MAX_VALUE * NUM_COLUMNS operations (i.e., constant time). */
size_t totals[NUM_COLUMNS];
for (i = 0; i < NUM_COLUMNS; i++) {
totals[i] = 0;
}
for (i = 0; i < MAX_VALUE; i++) {
for (j = 0; j < NUM_COLUMNS; j++) {
oldCount = counts[j][i];
counts[j][i] = totals[j];
totals[j] += oldCount;
}
}
temp = malloc(sizeof(BigInt) * count);
/* Now perform the actual sorting, using the counts to tell us how to move
the items. Each call below iterates over the whole dataset exactly once,
so this takes COUNT * NUM_COLUMNS operations, or theta(n). */
for (i = 0; i < NUM_COLUMNS; i += 2) {
SortByCounts(values, temp, counts[i ], count, i * BITSHIFT);
SortByCounts(temp, values, counts[i+1], count, (i+1) * BITSHIFT);
}
free(temp);
}
/*-------------------------------------------------------------------------------------------------
** Built-in Quicksorting.
*/
static int BigIntCompare(const void *a, const void *b)
{
BigInt av = *(BigInt *)a;
BigInt bv = *(BigInt *)b;
if (av < bv) return -1;
if (av > bv) return +1;
return 0;
}
static void BuiltInQuicksort(BigInt *values, size_t count)
{
qsort(values, count, sizeof(BigInt), BigIntCompare);
}
/*-------------------------------------------------------------------------------------------------
** File reading.
*/
/* Read a single integer from the given string, skipping initial whitespace, and
store it in 'returnValue'. Returns a pointer to the end of the integer text, or
NULL if no value can be read. */
static char *ReadInt(char *src, int *returnValue)
{
char ch;
int value;
/* Skip whitespace. */
while ((ch = *src) <= 32 && ch != '\r' && ch != '\n') src++;
/* Do we have a valid digit? */
if ((ch = *src++) < '0' || ch > '9')
return NULL;
/* Collect digits into a number. */
value = 0;
do {
value *= 10;
value += ch - '0';
} while ((ch = *src++) >= '0' && ch <= '9');
src--;
/* Return what we did. */
*returnValue = value;
return src;
}
/* Read a single line of values from the input into 'line', and return the number of
values that were read on this line. */
static int ReadLine(FILE *fp, BigInt *line)
{
int numValues;
char buffer[1024];
char *src;
int value;
BigInt result = 0;
if (fgets(buffer, 1024, fp) == NULL) return 0;
buffer[1023] = '\0';
numValues = 0;
src = buffer;
while ((src = ReadInt(src, &value)) != NULL) {
result |= ((BigInt)value << ((NUM_COLUMNS - ++numValues) * BITSHIFT));
}
*line = result;
return numValues;
}
/* Read from file 'fp', which should consist of a sequence of lines with
six decimal integers on each, and write the parsed, packed integer values
to a newly-allocated 'values' array. Returns the number of lines read. */
static size_t ReadInputFile(FILE *fp, BigInt **values)
{
BigInt line;
BigInt *dest;
size_t count, max;
count = 0;
dest = malloc(sizeof(BigInt) * (max = 256));
while (ReadLine(fp, &line)) {
if (count >= max) {
size_t newmax = max * 2;
BigInt *temp = malloc(sizeof(BigInt) * newmax);
memcpy(temp, dest, sizeof(BigInt) * count);
free(dest);
max = newmax;
dest = temp;
}
dest[count++] = line;
}
*values = dest;
return count;
}
/*-------------------------------------------------------------------------------------------------
** File writing.
*/
/* Given a number from 0 to 999 (inclusive), write its string representation to 'dest'
as fast as possible. Returns 'dest' incremented by the number of digits written. */
static char *WriteNumber(char *dest, unsigned int number)
{
if (number >= 100) {
dest += 3;
dest[-1] = '0' + number % 10, number /= 10;
dest[-2] = '0' + number % 10, number /= 10;
dest[-3] = '0' + number % 10;
}
else if (number >= 10) {
dest += 2;
dest[-1] = '0' + number % 10, number /= 10;
dest[-2] = '0' + number % 10;
}
else {
dest += 1;
dest[-1] = '0' + number;
}
return dest;
}
/* Write a single "value" (one line of content) to the output file. */
static void WriteOutputLine(FILE *fp, BigInt value)
{
char buffer[1024];
char *dest = buffer;
int i;
for (i = 0; i < NUM_COLUMNS; i++) {
if (i > 0)
*dest++ = ' ';
int number = (value >> (BITSHIFT * (NUM_COLUMNS - i - 1))) & BITMASK;
dest = WriteNumber(dest, (unsigned int)number);
}
*dest++ = '\n';
*dest = '\0';
fwrite(buffer, 1, dest - buffer, fp);
}
/* Write the entire output file as a sequence of values in columns. */
static void WriteOutputFile(FILE *fp, BigInt *values, size_t count)
{
while (count-- > 0) {
WriteOutputLine(fp, *values++);
}
}
/*-------------------------------------------------------------------------------------------------
** Timeval support.
*/
int timeval_subtract(struct timeval *result, struct timeval *x, struct timeval *y)
{
/* Perform the carry for the later subtraction by updating y. */
if (x->tv_usec < y->tv_usec) {
int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
y->tv_usec -= 1000000 * nsec;
y->tv_sec += nsec;
}
if (x->tv_usec - y->tv_usec > 1000000) {
int nsec = (y->tv_usec - x->tv_usec) / 1000000;
y->tv_usec += 1000000 * nsec;
y->tv_sec -= nsec;
}
/* Compute the time remaining to wait. tv_usec is certainly positive. */
result->tv_sec = x->tv_sec - y->tv_sec;
result->tv_usec = x->tv_usec - y->tv_usec;
/* Return 1 if result is negative. */
return x->tv_sec < y->tv_sec;
}
/*-------------------------------------------------------------------------------------------------
** Main.
*/
int main(int argc, char **argv)
{
BigInt *values;
size_t count;
FILE *fp;
struct timeval startTime, endTime, deltaTime;
if (argc != 3) {
fprintf(stderr, "Usage: sixsort input.txt output.txt\n");
return -1;
}
printf("Reading %s...\n", argv[1]);
if ((fp = fopen(argv[1], "r")) == NULL) {
fprintf(stderr, "Unable to open \"%s\" for reading.\n", argv[1]);
return -1;
}
count = ReadInputFile(fp, &values);
fclose(fp);
printf("Got %d lines. Sorting.\n", (int)count);
gettimeofday(&startTime, NULL);
RadixSortWithInternalCountingSort(values, count);
/*BuiltInQuicksort(values, count);*/
gettimeofday(&endTime, NULL);
timeval_subtract(&deltaTime, &endTime, &startTime);
printf("Sorted lines in %d seconds and %d microseconds.\n",
(int)deltaTime.tv_sec, (int)deltaTime.tv_usec);
printf("Writing %d lines to %s.\n", (int)count, argv[2]);
if ((fp = fopen(argv[2], "w")) == NULL) {
fprintf(stderr, "Unable to open \"%s\" for writing.\n", argv[2]);
return -1;
}
WriteOutputFile(fp, values, count);
fclose(fp);
free(values);
return 0;
}
Also, for reference, here's my crappy little sample-dataset producer sixsample.c
, using a predictable LCG random-number generator so it always generates the same sample data:
此外,作为参考,这里是我糟糕的小样本数据集生成器sixsample.c,使用可预测的LCG随机数生成器,因此它始终生成相同的样本数据:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <math.h>
#define NUM_COLUMNS 6
#define MAX_VALUE 35
unsigned int RandSeed = 0;
/* Generate a random number from 0 to 65535, inclusive, using a simple (fast) LCG.
The numbers below are the same as used in the ISO/IEC 9899 proposal for C99/C11. */
static int Rand()
{
RandSeed = RandSeed * 1103515245 + 12345;
return (int)(RandSeed >> 16);
}
static int CompareInts(const void *a, const void *b)
{
return *(int *)a - *(int *)b;
}
static void PrintSortedRandomLine()
{
int values[NUM_COLUMNS];
int i;
for (i = 0; i < NUM_COLUMNS; i++) {
values[i] = 1 + (int)(pow((double)Rand() / 65536.0, 2.0) * MAX_VALUE);
}
qsort(values, NUM_COLUMNS, sizeof(int), CompareInts);
/* Lame. */
printf("%d %d %d %d %d %d\n",
values[0], values[1], values[2], values[3], values[4], values[5]);
}
int main(int argc, char **argv)
{
unsigned int numLines;
unsigned int i;
if (argc < 2 || argc > 3) {
fprintf(stderr, "Usage: sixsample numLines [randSeed]\n");
return -1;
}
numLines = atoi(argv[1]);
if (argc > 2)
RandSeed = atoi(argv[2]);
for (i = 0; i < numLines; i++) {
PrintSortedRandomLine();
}
return 0;
}
And finally, here's a Makefile
that can build them the same way I did, on Ubuntu 16, with gcc 5.4.0:
最后,这是一个Makefile,它可以像我一样在Ubuntu 16上使用gcc 5.4.0构建它们:
all: sixsort sixsample
sixsort: sixsort.c
gcc -O6 -Wall -o sixsort sixsort.c
sixsample: sixsample.c
gcc -O -Wall -o sixsample sixsample.c
clean:
rm -f sixsort sixsample
Further Possible Optimizations
进一步可能的优化
I considered further optimizing some parts of this by writing them in pure assembly, but gcc does an excellent job of inlining the functions, unrolling the loops, and generally hitting this with most of the optimizations I would've otherwise considered doing myself in assembly, so it would be difficult to improve it that way. The core operations don't lend themselves very well to SIMD instructions, so gcc's version is likely as fast as it gets.
我考虑通过在纯汇编中编写它们来进一步优化其中的一些部分,但是gcc在内联函数,展开循环方面做得非常出色,并且通常用大多数我认为在汇编中自己做的优化来实现这一点,因此很难以这种方式改进它。核心操作不能很好地适应SIMD指令,因此gcc的版本可能会尽可能快。
This would probably run faster if I compiled it as a 64-bit process, since the CPU would be able to hold each complete record in a single register, but my test Ubuntu VM was 32-bit, so c'est la vie.
如果我将它编译为64位进程,这可能会运行得更快,因为CPU可以将每个完整记录保存在一个寄存器中,但我的测试Ubuntu VM是32位,所以c'est la vie。
Given that the values themselves are all fairly small, you might be able to improve my solution one further by combining code points: So instead of performing six counting sorts, one subsort on each of the seven-bit columns, you could divide up the same 42 bits in other ways: You could perform three subsorts on each of the 14-bit columns. This would require storing 3*2^14 (=3*16384=49152) counts and offsets in memory instead of 6*2^7 (=6*128=768), but since memory is fairly cheap, and the data would still fit in the CPU cache, it might be worth trying. And since the original data is two-digit decimal integers, you could even slice it down the middle and perform a pair of subsorts on each of two 6-decimal-digit columns of values (requiring 2*10^6=2000000 counts and offsets, which still might fit in your CPU cache, depending on your CPU).
鉴于值本身都非常小,您可以通过组合代码点来进一步改进我的解决方案:因此,不是执行六个计数排序,而是在每个七位列上有一个子排序,您可以将它们分开其他方式为42位:您可以在每个14位列上执行三个子输出。这需要在内存中存储3 * 2 ^ 14(= 3 * 16384 = 49152)计数和偏移而不是6 * 2 ^ 7(= 6 * 128 = 768),但由于内存相当便宜,数据仍然存在适合CPU缓存,可能值得尝试。由于原始数据是两位十进制整数,您甚至可以将其切入中间,并在两个6位十进制数值列中的每一列上执行一对子排序(需要2 * 10 ^ 6 = 2000000计数和偏移,仍然可能适合您的CPU缓存,具体取决于您的CPU)。
You could also parallelize it! My implementation lights up one CPU hard, but leaves the rest idle. So another technique to make this faster yet could be to divide the source data into chunks, one per CPU core, sort each chunk independently, and then perform a mergesort-like 'merge' operation at the end to combine the results together. If you find yourself running this computation a lot, parallelizing the work might be something to look into.
你也可以并行化它!我的实现很难点亮一个CPU,但剩下的就是空闲。因此,另一种使此更快的技术可能是将源数据划分为块,每个CPU核心一个,独立地对每个块进行排序,然后在最后执行类似mergesort的“合并”操作以将结果组合在一起。如果你发现自己经常运行这个计算,那么可能需要对工作进行并行化。
Conclusion and Other Thoughts
结论和其他想法
Although I didn't test it myself, it might be fair to compare the performance of a hand-written quicksort, since qsort()
includes function-call overhead when comparing values. I suspect a hand-written solution would beat the 5.3 seconds of qsort()
, but it probably wouldn't compete against the radix sort, both due to quicksort's nonlinear growth and its less-favorable CPU-cache characteristics (it would read each value more often than 7 times on average, so it would probably cost more to pull the data from RAM).
虽然我自己没有对它进行测试,但比较手写快速排序的性能可能是公平的,因为qsort()在比较值时包含函数调用开销。我怀疑手写的解决方案会超过qsort()的5.3秒,但它可能不会与基数排序竞争,这是由于快速排序的非线性增长和不太有利的CPU缓存特性(它会读取每个值)平均而言往往超过7次,因此从RAM中提取数据的成本可能更高。
Also, the cost of the sorting is now dwarfed by the cost of the I/O to read the text file into memory and write the result back out again. Even with my custom parsing code, it still takes several seconds to read the 33 million records, a fraction of a second to sort them, and a few seconds to write them back out again. If your data is already in RAM, that might not matter, but if it's on a disk somewhere, you'll need to start finding ways to optimize that too.
此外,排序的成本现在与将文本文件读入内存并再次将结果写回的I / O成本相形见绌。即使使用我的自定义解析代码,读取3300万条记录仍然需要几秒钟,对它们进行排序只需要几分之一秒,再过几秒就可以将它们写回来。如果您的数据已经在RAM中,那可能无关紧要,但如果它在某个磁盘上,您还需要开始寻找优化方法。
But as the goal of the question was to optimize the sorting itself, sorting 33 million records in only three-quarters of a second isn't too shabby, I think.
但是,由于问题的目标是优化排序本身,我认为,只有四分之三秒的时间排序3300万条记录并不算太糟糕。
#2
2
Jonathan Leffler's comment about reordering the packing is spot on and I had the same thought when looking over your code. The following would be my approach:
Jonathan Leffler关于重新排序包装的评论很有见,在查看代码时我也有同样的想法。以下是我的方法:
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <string.h> // for memcpy
#define ROW_LENGTH 6
#define ROW_COUNT 15
// 1, 2, 3, 4, 5, 6, 6, 8, 9, 10, 13, 15, 1, 4, 5, 6, 7, 9, 1, 4, 5, 6, 7, 8, 3, 18, 19, 20, 25, 34
/*
1 2 3 4 5 6
6 8 9 10 13 15
1 4 5 6 7 9
1 4 5 6 7 8
3 18 19 20 25 34
*/
// Insertion sorting taken from https://*.com/a/2789530/2694511 with modification
static __inline__ int sortUlliArray(unsigned long long int *d, int length){
int i, j;
for (i = 1; i < length; i++) {
unsigned long long int tmp = d[i];
for (j = i; j >= 1 && tmp < d[j-1]; j--)
d[j] = d[j-1];
d[j] = tmp;
}
return i; // just to shutup compiler
}
int cmpfunc (const void * a, const void * b)
{
if (*(unsigned long long int*)a > *(unsigned long long int*)b) return 1;
if (*(unsigned long long int*)a < *(unsigned long long int*)b) return -1;
return 0;
}
int main(){
int array[ROW_COUNT][ROW_LENGTH],
decodedResultsArray[ROW_COUNT][ROW_LENGTH];
const int rawData[] = { 1, 2, 3, 4, 5, 6,
6, 8, 9, 10, 13, 15,
1, 4, 5, 6, 7, 9,
1, 4, 5, 6, 7, 8,
3, 18, 19, 20, 25, 34,
6,17,19,20,22,46,
5,10,34,40,41,49,
5,19,20,37,53,57,
16,23,26,36,42,50,
12,19,20,30,47,53,
16,25,30,48,56,59,
8,11,25,35,36,45,
2,12,16,21,32,40,
4,6,31,36,37,44,
21,28,31,48,52,57
};
memcpy(array, rawData, sizeof(rawData)/sizeof(*rawData)); // copy elements into array memory
// Sort
// precompute keys
unsigned long long int *rowSums = calloc(ROW_COUNT, sizeof(unsigned long long int));
unsigned long long int *sortedSums = rowSums ? calloc(ROW_COUNT, sizeof(unsigned long long int)) : NULL; // if rowSums is null, don't bother trying to allocate.
if(!rowSums || !sortedSums){
free(rowSums);
free(sortedSums);
fprintf(stderr, "Failed to allocate memory!\n");
fflush(stderr); // should be unnecessary, but better to make sure it gets printed
exit(100);
}
int i=0, j=0, k=0;
for(; i < ROW_COUNT; i++){
rowSums[i] = 0; // this should be handled by calloc, but adding this for debug
for(j=0; j < ROW_LENGTH; j++){
unsigned long long int iScalar=1;
for(k=ROW_LENGTH-1; k > j; --k)
iScalar *= 100; // probably not the most efficient way to compute this, but this is meant more as an example/proof of concept
unsigned long long int iHere = array[i][j];
rowSums[i] += (iHere * iScalar);
// printf("DEBUG ITERATION REPORT\n\t\tRow #%d\n\t\tColumn #%d\n\t\tiScalar: %llu\n\t\tiHere: %llu\n\t\tCurrent Sum for Row: %llu\n\n", i, j, iScalar, iHere, rowSums[i]);
fflush(stdout);
}
}
memcpy(sortedSums, rowSums, sizeof(unsigned long long int)*ROW_COUNT);
// Some debugging output:
/*
printf("Uncopied Sums:\n");
for(i=0; i < ROW_COUNT; i++)
printf("SortedRowSums[%d] = %llu\n", i, rowSums[i]);
printf("Memcopyed sort array:\n");
for(i=0; i < ROW_COUNT; i++)
printf("SortedRowSums[%d] = %llu\n", i, sortedSums[i]);
*/
clock_t begin = clock();
//qsort(sortedSums, ROW_COUNT, sizeof(unsigned long long int), cmpfunc);
sortUlliArray(sortedSums, ROW_COUNT);
clock_t end = clock();
double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
printf("Time for sort: %lf\n", time_spent);
printf("Before sort array:\n");
for(i=0; i<ROW_COUNT; i++){
for(j=0; j < ROW_LENGTH; j++){
printf("Unsorted[%d][%d] = %d\n", i, j, array[i][j]);
}
}
printf("Values of sorted computed keys:\n");
for(i=0; i < ROW_COUNT; i++)
printf("SortedRowSums[%d] = %llu\n", i, sortedSums[i]);
// Unpack:
for(i=0; i < ROW_COUNT; i++){
for(j=0; j < ROW_LENGTH; j++){
unsigned long long int iScalar=1;
for(k=ROW_LENGTH-1; k > j; --k)
iScalar *= 100;
unsigned long long int removalAmount = sortedSums[i]/iScalar;
decodedResultsArray[i][j] = removalAmount;
sortedSums[i] -= (removalAmount*iScalar);
// DEBUG:
// printf("Decoded Result for decoded[%d][%d] = %d\n", i, j, decodedResultsArray[i][j]);
}
}
printf("\nFinal Output:\n");
for(i=0; i < ROW_COUNT; i++){
printf("Row #%d: %d", i, decodedResultsArray[i][0]);
for(j=1; j < ROW_LENGTH; j++){
printf(", %d", decodedResultsArray[i][j]);
}
puts("");
}
fflush(stdout);
free(rowSums);
free(sortedSums);
return 1;
}
Do note, this is not all optimized for the maximum efficiency and it is littered with debug output statements, but nonetheless, it's a proof of concept on how the packing can work. Also, given the number of rows you have to handle, you will probably be better to use qsort()
, but I have it using sortUlliArray(...)
(which is a modified version of the Insert-sort function from this * answer). You'll have to give it a test to see what performs best for your case.
请注意,这并不是为了实现最高效率而进行了优化,而是充满了调试输出语句,但它仍然是包装如何工作的概念证明。另外,考虑到你必须处理的行数,你可能会更好地使用qsort(),但我使用sortUlliArray(...)(这是*回答中Insert-sort函数的修改版本) )。你必须给它一个测试,看看哪种方法最适合你的情况。
All in all, the final output from running this code on the 15 hardcoded rows is:
总而言之,在15个硬编码行上运行此代码的最终输出是:
Row #0: 1, 2, 3, 4, 5, 6
Row #1: 1, 4, 5, 6, 7, 8
Row #2: 1, 4, 5, 6, 7, 9
Row #3: 2, 12, 16, 21, 32, 40
Row #4: 3, 18, 19, 20, 25, 34
Row #5: 4, 6, 31, 36, 37, 44
Row #6: 5, 10, 34, 40, 41, 49
Row #7: 5, 19, 20, 37, 53, 57
Row #8: 6, 8, 9, 10, 13, 15
Row #9: 6, 17, 19, 20, 22, 46
Row #10: 8, 11, 25, 35, 36, 45
Row #11: 12, 19, 20, 30, 47, 53
Row #12: 16, 23, 26, 36, 42, 50
Row #13: 16, 25, 30, 48, 56, 59
Row #14: 21, 28, 31, 48, 52, 57
So, this does appear to handle the cases where the numbers are very similar, which was an issue attributable to the order the numbers were packed in.
因此,这似乎可以处理数字非常相似的情况,这是一个可归因于数字包装顺序的问题。
Anyway, the code above should work, but it is meant as an example, so I will leave it to you to apply necessary optimizations.
无论如何,上面的代码应该可行,但它只是作为一个例子,所以我将留给你应用必要的优化。
Code was tested on a MacBook Air 64-bit with 1.6 GHz Intel Core i5 and 4 GB 1600 MHz DDR3. So, a pretty weak CPU and slow memory, but it was able to do the sort for the 15 rows 0.004 milliseconds, so fairly fast, in my opinion. (That is just a measure of the sort function's speed for the above test case, not for the pre-packing or unpacking speeds, as those could use some optimization.)
代码在具有1.6 GHz Intel Core i5和4 GB 1600 MHz DDR3的MacBook Air 64位上进行了测试。所以,一个非常弱的CPU和缓慢的内存,但它能够对这15行进行排序0.004毫秒,在我看来相当快。 (这只是上述测试用例的排序函数速度的度量,而不是预打包或解包速度,因为它们可以使用一些优化。)
Major credit goes to Doynax and Jonathan Leffler.
主要归功于Doynax和Jonathan Leffler。
#1
3
I can't resist a good optimization challenge.
我无法抗拒好的优化挑战。
Initial Thoughts
My first thought in seeing the problem was to use a radix sort, specifically a radix sort with an internal counting sort. (And it's nice to see that some of the comments agreed with me, too!) I've used radix sort with internal counting sort in other projects, and while quicksort can outperform it for small datasets, it leaves quicksort in the dust for large datasets.
我首先想到的问题是使用基数排序,特别是带有内部计数排序的基数排序。 (并且很高兴看到一些评论也同意我的意见!)我在其他项目中使用了基数排序和内部计数排序,而快速排序可以在小数据集中超越它,它可以快速排除大量数据数据集。
I went for radix sort because comparison sorts like quicksort have a well-known performance limit of O(n lg n). Because n is pretty big here, that "lg n" part is also pretty big — for 33 million records, you can reasonably expect it will take "some constant time" times 33000000 times lg(33000000), and lg(33000000) is about 25. Sorts like radix sort and counting sort are "non-comparing" sorts, so they can cut under that O(n lg n) boundary by having special knowledge of the data — in this case, they can go much faster because we know the data is small integers.
我选择了基数排序,因为像quicksort这样的比较排序具有众所周知的O(n lg n)性能限制。因为n在这里相当大,“lg n”部分也相当大 - 对于3300万条记录,你可以合理地预期它将需要“一些恒定时间”时间3300万次lg(33000000),并且lg(33000000)是关于25.像基数排序和计数排序这样的排序是“非比较”排序,因此他们可以通过对数据的特殊了解来削减O(n lg n)边界 - 在这种情况下,它们可以更快,因为我们知道数据是小整数。
I used the same mash-the-values-together code that everybody else did, combining each set of int
values into a single BigInt
, but I went for bit-shifts and masking rather than multiplication and modulus, since bit operations tend to be faster overall.
我使用了与其他人一样的mash-the-values-together代码,将每组int值组合到一个BigInt中,但我选择了位移和屏蔽而不是乘法和模数,因为位操作往往更快总体。
My implementation (see below) performs some constant-time setup computations, and then it iterates over the data exactly 7 times to sort it — and "7 times" not only beats the pants off "25 times" algorithmically, but my implementation is careful to avoid touching the memory more often than necessary so that it plays as well as it can with CPU caches too.
我的实现(见下文)执行一些恒定时间的设置计算,然后它将数据完全重复7次以对其进行排序 - 并且“7次”不仅在算法上击败了“25次”,而且我的实现很谨慎避免更频繁地触摸内存,以便它也可以与CPU缓存一起播放。
For reasonable source data, I wrote a little program that generated 33 million records that looked vaguely like your originals. Each row is in sorted order, and slightly biased toward smaller values. (You can find my sixsample.c
sample-dataset program below.)
对于合理的源数据,我写了一个小程序,生成了3300万条记录,看起来有点像你的原件。每行按排序顺序排列,略微偏向较小的值。 (您可以在下面找到我的sixsample.c样本数据集程序。)
Performance Comparison
Using something very close to your qsort()
implementation, I got these numbers over three runs, and they aren't too far off from what you were getting (different CPU, obviously):
使用非常接近你的qsort()实现的东西,我在三次运行中获得了这些数字,并且它们与你得到的相差不远(显然不同的CPU):
qsort:
Got 33000000 lines. Sorting.
Sorted lines in 5 seconds and 288636 microseconds.
Sorted lines in 5 seconds and 415553 microseconds.
Sorted lines in 5 seconds and 242454 microseconds.
Using my radix-sort-with-counting-sort implementation, I got these numbers over three runs:
使用我的radix-sort-with-counting-sort实现,我在三次运行中获得了这些数字:
Radix sort with internal counting sort, and cache-optimized:
Got 33000000 lines. Sorting.
Sorted lines in 0 seconds and 749285 microseconds.
Sorted lines in 0 seconds and 833474 microseconds.
Sorted lines in 0 seconds and 761943 microseconds.
5.318 seconds vs 0.781 seconds to sort the same 33 million records. That's 6.8x faster!
5.318秒vs 0.781秒对同样的3300万条记录进行排序。这快6.8倍!
Just to be sure, I diff
ed the output from each run to guarantee the results were the same — you really can sort tens of millions of records in under a second!
可以肯定的是,我将每次运行的输出分开以保证结果是相同的 - 你真的可以在一秒钟内排序数千万条记录!
Deeper Dive
The basic principles of a radix sort and a counting sort are described well on Wikipedia, and in one of my favorite books, Introduction to Algorithms, so I won't repeat the basics here. Instead, I'll describe a bit about how my implementation differs from the textbook forms to run a bit faster.
基础排序和计数排序的基本原理在*上有很好的描述,在我最喜欢的书之一,算法导论中,所以我不再重复这里的基础知识。相反,我将稍微介绍一下我的实现与教科书表单的不同之处,以便更快地运行。
The normal implementation of these algorithms is something like this (in Python-esque pseudocode):
这些算法的正常实现是这样的(在Python-esque伪代码中):
def radix-sort(records):
temp = []
for column = 0 to columns:
counting-sort(records, temp, column)
copy 'temp' back into 'records'
def counting-sort(records, dest, column):
# Count up how many of each value there is at this column.
counts = []
foreach record in records:
counts[record[column]] += 1
transform 'counts' into offsets
copy from 'records' to 'dest' using the calculated offsets
This certainly works, but it comes with a few uglinesses. The "count up the values" step involves counting values in the whole dataset for every column. The "copy temp back into records" step involves copying the whole dataset for every column column. And those are in addition to the required step of "copy from records-to-dest in sorted order". We spin over the data three times for every column; it would be nice to do that less often!
这肯定有效,但它带有一些uglinesses。 “计算值”步骤涉及计算每列的整个数据集中的值。 “将温度复制回记录”步骤涉及复制每个列列的整个数据集。除了“按照排序顺序从记录到目录复制”所需的步骤之外,还有这些步骤。我们为每列旋转数据三次;不那么经常这样做会很好!
My implementation mixes these together: Rather than calculating the counts for each column separately, they're calculated together as a first pass over the data. Then the counts are transformed in bulk into offsets; and finally, the data is sorted into place. In addition, rather than copying from temp
to records
at each stage, I simply page-flip: The data alternates where it's "coming from" every other iteration. My pseudocode is something more like this:
我的实现将这些混合在一起:它们不是分别计算每列的计数,而是作为数据的第一遍计算在一起。然后将计数大量转换为偏移量;最后,数据被分类到位。另外,我不是在每个阶段从temp复制到记录,而是简单地翻页:数据交替出现在每隔一次迭代“来自”的地方。我的伪代码更像是这样的:
def radix-sort(records):
# Count up how many of each value there is in every column,
# reading each record only once.
counts = [,]
foreach record in records:
foreach column in columns:
counts[column, record[column]] += 1
transform 'counts' for each column into offsets for each column
temp = []
for column = 0 to columns step 2:
sort-by-counts(records, temp, column)
sort-by-counts(temp, records, column+1)
def sort-by-counts(records, dest, counts, column):
copy from 'records' to 'dest' using the calculated offsets
This avoids spinning over the data any more than necessary: One pass for setup, and six passes to sort it, all in order. You can't make the CPU cache much happier.
这样可以避免超出必要的数据:一次通过设置,六次通过排序,所有这些都按顺序排列。你不能让CPU缓存更快乐。
The C Code
C代码
Here's my complete implementation of sixsort.c
. It includes your qsort
solution as well, commented-out, so it's easier for you to compare the two. It includes lots of documentation, and all of the I/O and metrics code, so it's somewhat long, but its completeness should help you to understand the solution better:
这是我对sixsort.c的完整实现。它还包括你的qsort解决方案,注释掉了,所以你更容易比较两者。它包含大量文档以及所有I / O和指标代码,所以它有点长,但它的完整性应该可以帮助您更好地理解解决方案:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <sys/time.h>
/* Configuration. */
#define NUM_COLUMNS 6 /* How many columns to read/sort */
#define MAX_VALUE 100 /* Maximum value allowed in each column, plus one */
#define BITSHIFT 7 /* How far to shift bits when combining values */
/* Note: BITSHIFT should be as small as possible, but MAX_VALUE must be <= 2**BITSHIFT. */
/* Highest value representable under the given BITSHIFT. */
#define BITMASK ((1 << BITSHIFT) - 1)
/* The type we're going to pack the integers into. The highest bits will get the
leftmost number, while the lowest bits will get the rightmost number. */
typedef unsigned long long BigInt;
/*-------------------------------------------------------------------------------------------------
** Radix Sorting.
*/
/* Move a set of items from src to dest, using the provided counts to decide where each
item belongs. src and dest must not be the same arrays. */
static void SortByCounts(BigInt *src, BigInt *dest, size_t *counts, size_t totalCount, int bitshift)
{
BigInt *temp, *end;
for (temp = src, end = src + totalCount; temp < end; temp++) {
BigInt value = *temp;
int number = (int)((value >> bitshift) & BITMASK);
int offset = counts[number]++;
dest[offset] = value;
}
}
/* Use a radix sort with an internal counting sort to sort the given array of values.
This iterates over the source data exactly 7 times, which for large
'count' (i.e., count > ~2000) is typically much more efficient than O(n lg n)
algorithms like quicksort or mergesort or heapsort. (That said, the recursive O(n lg n)
algorithms typically have better memory locality, so which solution wins overall
may vary depending on your CPU and memory system.) */
static void RadixSortWithInternalCountingSort(BigInt *values, size_t count)
{
size_t i, j;
BigInt *temp, *end;
size_t counts[NUM_COLUMNS][MAX_VALUE];
size_t oldCount;
/* Reset the counts of each value in each column to zero, quickly.
This takes MAX_VALUE * NUM_COLUMNS time (i.e., constant time). */
memset(counts, 0, sizeof(counts));
/* Count up how many there are of each value in this column. This iterates over
the whole dataset exactly once, processing each set of values in full, so it
takes COUNT * NUM_COLUMNS operations, or theta(n). */
for (temp = values, end = values + count; temp < end; temp++) {
BigInt value = *temp;
for (i = 0; i < NUM_COLUMNS; i++) {
counts[i][(int)((value >> (i * BITSHIFT)) & BITMASK)]++;
}
}
/* Transform the counts into offsets. This only transforms the counts array,
so it takes MAX_VALUE * NUM_COLUMNS operations (i.e., constant time). */
size_t totals[NUM_COLUMNS];
for (i = 0; i < NUM_COLUMNS; i++) {
totals[i] = 0;
}
for (i = 0; i < MAX_VALUE; i++) {
for (j = 0; j < NUM_COLUMNS; j++) {
oldCount = counts[j][i];
counts[j][i] = totals[j];
totals[j] += oldCount;
}
}
temp = malloc(sizeof(BigInt) * count);
/* Now perform the actual sorting, using the counts to tell us how to move
the items. Each call below iterates over the whole dataset exactly once,
so this takes COUNT * NUM_COLUMNS operations, or theta(n). */
for (i = 0; i < NUM_COLUMNS; i += 2) {
SortByCounts(values, temp, counts[i ], count, i * BITSHIFT);
SortByCounts(temp, values, counts[i+1], count, (i+1) * BITSHIFT);
}
free(temp);
}
/*-------------------------------------------------------------------------------------------------
** Built-in Quicksorting.
*/
static int BigIntCompare(const void *a, const void *b)
{
BigInt av = *(BigInt *)a;
BigInt bv = *(BigInt *)b;
if (av < bv) return -1;
if (av > bv) return +1;
return 0;
}
static void BuiltInQuicksort(BigInt *values, size_t count)
{
qsort(values, count, sizeof(BigInt), BigIntCompare);
}
/*-------------------------------------------------------------------------------------------------
** File reading.
*/
/* Read a single integer from the given string, skipping initial whitespace, and
store it in 'returnValue'. Returns a pointer to the end of the integer text, or
NULL if no value can be read. */
static char *ReadInt(char *src, int *returnValue)
{
char ch;
int value;
/* Skip whitespace. */
while ((ch = *src) <= 32 && ch != '\r' && ch != '\n') src++;
/* Do we have a valid digit? */
if ((ch = *src++) < '0' || ch > '9')
return NULL;
/* Collect digits into a number. */
value = 0;
do {
value *= 10;
value += ch - '0';
} while ((ch = *src++) >= '0' && ch <= '9');
src--;
/* Return what we did. */
*returnValue = value;
return src;
}
/* Read a single line of values from the input into 'line', and return the number of
values that were read on this line. */
static int ReadLine(FILE *fp, BigInt *line)
{
int numValues;
char buffer[1024];
char *src;
int value;
BigInt result = 0;
if (fgets(buffer, 1024, fp) == NULL) return 0;
buffer[1023] = '\0';
numValues = 0;
src = buffer;
while ((src = ReadInt(src, &value)) != NULL) {
result |= ((BigInt)value << ((NUM_COLUMNS - ++numValues) * BITSHIFT));
}
*line = result;
return numValues;
}
/* Read from file 'fp', which should consist of a sequence of lines with
six decimal integers on each, and write the parsed, packed integer values
to a newly-allocated 'values' array. Returns the number of lines read. */
static size_t ReadInputFile(FILE *fp, BigInt **values)
{
BigInt line;
BigInt *dest;
size_t count, max;
count = 0;
dest = malloc(sizeof(BigInt) * (max = 256));
while (ReadLine(fp, &line)) {
if (count >= max) {
size_t newmax = max * 2;
BigInt *temp = malloc(sizeof(BigInt) * newmax);
memcpy(temp, dest, sizeof(BigInt) * count);
free(dest);
max = newmax;
dest = temp;
}
dest[count++] = line;
}
*values = dest;
return count;
}
/*-------------------------------------------------------------------------------------------------
** File writing.
*/
/* Given a number from 0 to 999 (inclusive), write its string representation to 'dest'
as fast as possible. Returns 'dest' incremented by the number of digits written. */
static char *WriteNumber(char *dest, unsigned int number)
{
if (number >= 100) {
dest += 3;
dest[-1] = '0' + number % 10, number /= 10;
dest[-2] = '0' + number % 10, number /= 10;
dest[-3] = '0' + number % 10;
}
else if (number >= 10) {
dest += 2;
dest[-1] = '0' + number % 10, number /= 10;
dest[-2] = '0' + number % 10;
}
else {
dest += 1;
dest[-1] = '0' + number;
}
return dest;
}
/* Write a single "value" (one line of content) to the output file. */
static void WriteOutputLine(FILE *fp, BigInt value)
{
char buffer[1024];
char *dest = buffer;
int i;
for (i = 0; i < NUM_COLUMNS; i++) {
if (i > 0)
*dest++ = ' ';
int number = (value >> (BITSHIFT * (NUM_COLUMNS - i - 1))) & BITMASK;
dest = WriteNumber(dest, (unsigned int)number);
}
*dest++ = '\n';
*dest = '\0';
fwrite(buffer, 1, dest - buffer, fp);
}
/* Write the entire output file as a sequence of values in columns. */
static void WriteOutputFile(FILE *fp, BigInt *values, size_t count)
{
while (count-- > 0) {
WriteOutputLine(fp, *values++);
}
}
/*-------------------------------------------------------------------------------------------------
** Timeval support.
*/
int timeval_subtract(struct timeval *result, struct timeval *x, struct timeval *y)
{
/* Perform the carry for the later subtraction by updating y. */
if (x->tv_usec < y->tv_usec) {
int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
y->tv_usec -= 1000000 * nsec;
y->tv_sec += nsec;
}
if (x->tv_usec - y->tv_usec > 1000000) {
int nsec = (y->tv_usec - x->tv_usec) / 1000000;
y->tv_usec += 1000000 * nsec;
y->tv_sec -= nsec;
}
/* Compute the time remaining to wait. tv_usec is certainly positive. */
result->tv_sec = x->tv_sec - y->tv_sec;
result->tv_usec = x->tv_usec - y->tv_usec;
/* Return 1 if result is negative. */
return x->tv_sec < y->tv_sec;
}
/*-------------------------------------------------------------------------------------------------
** Main.
*/
int main(int argc, char **argv)
{
BigInt *values;
size_t count;
FILE *fp;
struct timeval startTime, endTime, deltaTime;
if (argc != 3) {
fprintf(stderr, "Usage: sixsort input.txt output.txt\n");
return -1;
}
printf("Reading %s...\n", argv[1]);
if ((fp = fopen(argv[1], "r")) == NULL) {
fprintf(stderr, "Unable to open \"%s\" for reading.\n", argv[1]);
return -1;
}
count = ReadInputFile(fp, &values);
fclose(fp);
printf("Got %d lines. Sorting.\n", (int)count);
gettimeofday(&startTime, NULL);
RadixSortWithInternalCountingSort(values, count);
/*BuiltInQuicksort(values, count);*/
gettimeofday(&endTime, NULL);
timeval_subtract(&deltaTime, &endTime, &startTime);
printf("Sorted lines in %d seconds and %d microseconds.\n",
(int)deltaTime.tv_sec, (int)deltaTime.tv_usec);
printf("Writing %d lines to %s.\n", (int)count, argv[2]);
if ((fp = fopen(argv[2], "w")) == NULL) {
fprintf(stderr, "Unable to open \"%s\" for writing.\n", argv[2]);
return -1;
}
WriteOutputFile(fp, values, count);
fclose(fp);
free(values);
return 0;
}
Also, for reference, here's my crappy little sample-dataset producer sixsample.c
, using a predictable LCG random-number generator so it always generates the same sample data:
此外,作为参考,这里是我糟糕的小样本数据集生成器sixsample.c,使用可预测的LCG随机数生成器,因此它始终生成相同的样本数据:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <math.h>
#define NUM_COLUMNS 6
#define MAX_VALUE 35
unsigned int RandSeed = 0;
/* Generate a random number from 0 to 65535, inclusive, using a simple (fast) LCG.
The numbers below are the same as used in the ISO/IEC 9899 proposal for C99/C11. */
static int Rand()
{
RandSeed = RandSeed * 1103515245 + 12345;
return (int)(RandSeed >> 16);
}
static int CompareInts(const void *a, const void *b)
{
return *(int *)a - *(int *)b;
}
static void PrintSortedRandomLine()
{
int values[NUM_COLUMNS];
int i;
for (i = 0; i < NUM_COLUMNS; i++) {
values[i] = 1 + (int)(pow((double)Rand() / 65536.0, 2.0) * MAX_VALUE);
}
qsort(values, NUM_COLUMNS, sizeof(int), CompareInts);
/* Lame. */
printf("%d %d %d %d %d %d\n",
values[0], values[1], values[2], values[3], values[4], values[5]);
}
int main(int argc, char **argv)
{
unsigned int numLines;
unsigned int i;
if (argc < 2 || argc > 3) {
fprintf(stderr, "Usage: sixsample numLines [randSeed]\n");
return -1;
}
numLines = atoi(argv[1]);
if (argc > 2)
RandSeed = atoi(argv[2]);
for (i = 0; i < numLines; i++) {
PrintSortedRandomLine();
}
return 0;
}
And finally, here's a Makefile
that can build them the same way I did, on Ubuntu 16, with gcc 5.4.0:
最后,这是一个Makefile,它可以像我一样在Ubuntu 16上使用gcc 5.4.0构建它们:
all: sixsort sixsample
sixsort: sixsort.c
gcc -O6 -Wall -o sixsort sixsort.c
sixsample: sixsample.c
gcc -O -Wall -o sixsample sixsample.c
clean:
rm -f sixsort sixsample
Further Possible Optimizations
进一步可能的优化
I considered further optimizing some parts of this by writing them in pure assembly, but gcc does an excellent job of inlining the functions, unrolling the loops, and generally hitting this with most of the optimizations I would've otherwise considered doing myself in assembly, so it would be difficult to improve it that way. The core operations don't lend themselves very well to SIMD instructions, so gcc's version is likely as fast as it gets.
我考虑通过在纯汇编中编写它们来进一步优化其中的一些部分,但是gcc在内联函数,展开循环方面做得非常出色,并且通常用大多数我认为在汇编中自己做的优化来实现这一点,因此很难以这种方式改进它。核心操作不能很好地适应SIMD指令,因此gcc的版本可能会尽可能快。
This would probably run faster if I compiled it as a 64-bit process, since the CPU would be able to hold each complete record in a single register, but my test Ubuntu VM was 32-bit, so c'est la vie.
如果我将它编译为64位进程,这可能会运行得更快,因为CPU可以将每个完整记录保存在一个寄存器中,但我的测试Ubuntu VM是32位,所以c'est la vie。
Given that the values themselves are all fairly small, you might be able to improve my solution one further by combining code points: So instead of performing six counting sorts, one subsort on each of the seven-bit columns, you could divide up the same 42 bits in other ways: You could perform three subsorts on each of the 14-bit columns. This would require storing 3*2^14 (=3*16384=49152) counts and offsets in memory instead of 6*2^7 (=6*128=768), but since memory is fairly cheap, and the data would still fit in the CPU cache, it might be worth trying. And since the original data is two-digit decimal integers, you could even slice it down the middle and perform a pair of subsorts on each of two 6-decimal-digit columns of values (requiring 2*10^6=2000000 counts and offsets, which still might fit in your CPU cache, depending on your CPU).
鉴于值本身都非常小,您可以通过组合代码点来进一步改进我的解决方案:因此,不是执行六个计数排序,而是在每个七位列上有一个子排序,您可以将它们分开其他方式为42位:您可以在每个14位列上执行三个子输出。这需要在内存中存储3 * 2 ^ 14(= 3 * 16384 = 49152)计数和偏移而不是6 * 2 ^ 7(= 6 * 128 = 768),但由于内存相当便宜,数据仍然存在适合CPU缓存,可能值得尝试。由于原始数据是两位十进制整数,您甚至可以将其切入中间,并在两个6位十进制数值列中的每一列上执行一对子排序(需要2 * 10 ^ 6 = 2000000计数和偏移,仍然可能适合您的CPU缓存,具体取决于您的CPU)。
You could also parallelize it! My implementation lights up one CPU hard, but leaves the rest idle. So another technique to make this faster yet could be to divide the source data into chunks, one per CPU core, sort each chunk independently, and then perform a mergesort-like 'merge' operation at the end to combine the results together. If you find yourself running this computation a lot, parallelizing the work might be something to look into.
你也可以并行化它!我的实现很难点亮一个CPU,但剩下的就是空闲。因此,另一种使此更快的技术可能是将源数据划分为块,每个CPU核心一个,独立地对每个块进行排序,然后在最后执行类似mergesort的“合并”操作以将结果组合在一起。如果你发现自己经常运行这个计算,那么可能需要对工作进行并行化。
Conclusion and Other Thoughts
结论和其他想法
Although I didn't test it myself, it might be fair to compare the performance of a hand-written quicksort, since qsort()
includes function-call overhead when comparing values. I suspect a hand-written solution would beat the 5.3 seconds of qsort()
, but it probably wouldn't compete against the radix sort, both due to quicksort's nonlinear growth and its less-favorable CPU-cache characteristics (it would read each value more often than 7 times on average, so it would probably cost more to pull the data from RAM).
虽然我自己没有对它进行测试,但比较手写快速排序的性能可能是公平的,因为qsort()在比较值时包含函数调用开销。我怀疑手写的解决方案会超过qsort()的5.3秒,但它可能不会与基数排序竞争,这是由于快速排序的非线性增长和不太有利的CPU缓存特性(它会读取每个值)平均而言往往超过7次,因此从RAM中提取数据的成本可能更高。
Also, the cost of the sorting is now dwarfed by the cost of the I/O to read the text file into memory and write the result back out again. Even with my custom parsing code, it still takes several seconds to read the 33 million records, a fraction of a second to sort them, and a few seconds to write them back out again. If your data is already in RAM, that might not matter, but if it's on a disk somewhere, you'll need to start finding ways to optimize that too.
此外,排序的成本现在与将文本文件读入内存并再次将结果写回的I / O成本相形见绌。即使使用我的自定义解析代码,读取3300万条记录仍然需要几秒钟,对它们进行排序只需要几分之一秒,再过几秒就可以将它们写回来。如果您的数据已经在RAM中,那可能无关紧要,但如果它在某个磁盘上,您还需要开始寻找优化方法。
But as the goal of the question was to optimize the sorting itself, sorting 33 million records in only three-quarters of a second isn't too shabby, I think.
但是,由于问题的目标是优化排序本身,我认为,只有四分之三秒的时间排序3300万条记录并不算太糟糕。
#2
2
Jonathan Leffler's comment about reordering the packing is spot on and I had the same thought when looking over your code. The following would be my approach:
Jonathan Leffler关于重新排序包装的评论很有见,在查看代码时我也有同样的想法。以下是我的方法:
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <string.h> // for memcpy
#define ROW_LENGTH 6
#define ROW_COUNT 15
// 1, 2, 3, 4, 5, 6, 6, 8, 9, 10, 13, 15, 1, 4, 5, 6, 7, 9, 1, 4, 5, 6, 7, 8, 3, 18, 19, 20, 25, 34
/*
1 2 3 4 5 6
6 8 9 10 13 15
1 4 5 6 7 9
1 4 5 6 7 8
3 18 19 20 25 34
*/
// Insertion sorting taken from https://*.com/a/2789530/2694511 with modification
static __inline__ int sortUlliArray(unsigned long long int *d, int length){
int i, j;
for (i = 1; i < length; i++) {
unsigned long long int tmp = d[i];
for (j = i; j >= 1 && tmp < d[j-1]; j--)
d[j] = d[j-1];
d[j] = tmp;
}
return i; // just to shutup compiler
}
int cmpfunc (const void * a, const void * b)
{
if (*(unsigned long long int*)a > *(unsigned long long int*)b) return 1;
if (*(unsigned long long int*)a < *(unsigned long long int*)b) return -1;
return 0;
}
int main(){
int array[ROW_COUNT][ROW_LENGTH],
decodedResultsArray[ROW_COUNT][ROW_LENGTH];
const int rawData[] = { 1, 2, 3, 4, 5, 6,
6, 8, 9, 10, 13, 15,
1, 4, 5, 6, 7, 9,
1, 4, 5, 6, 7, 8,
3, 18, 19, 20, 25, 34,
6,17,19,20,22,46,
5,10,34,40,41,49,
5,19,20,37,53,57,
16,23,26,36,42,50,
12,19,20,30,47,53,
16,25,30,48,56,59,
8,11,25,35,36,45,
2,12,16,21,32,40,
4,6,31,36,37,44,
21,28,31,48,52,57
};
memcpy(array, rawData, sizeof(rawData)/sizeof(*rawData)); // copy elements into array memory
// Sort
// precompute keys
unsigned long long int *rowSums = calloc(ROW_COUNT, sizeof(unsigned long long int));
unsigned long long int *sortedSums = rowSums ? calloc(ROW_COUNT, sizeof(unsigned long long int)) : NULL; // if rowSums is null, don't bother trying to allocate.
if(!rowSums || !sortedSums){
free(rowSums);
free(sortedSums);
fprintf(stderr, "Failed to allocate memory!\n");
fflush(stderr); // should be unnecessary, but better to make sure it gets printed
exit(100);
}
int i=0, j=0, k=0;
for(; i < ROW_COUNT; i++){
rowSums[i] = 0; // this should be handled by calloc, but adding this for debug
for(j=0; j < ROW_LENGTH; j++){
unsigned long long int iScalar=1;
for(k=ROW_LENGTH-1; k > j; --k)
iScalar *= 100; // probably not the most efficient way to compute this, but this is meant more as an example/proof of concept
unsigned long long int iHere = array[i][j];
rowSums[i] += (iHere * iScalar);
// printf("DEBUG ITERATION REPORT\n\t\tRow #%d\n\t\tColumn #%d\n\t\tiScalar: %llu\n\t\tiHere: %llu\n\t\tCurrent Sum for Row: %llu\n\n", i, j, iScalar, iHere, rowSums[i]);
fflush(stdout);
}
}
memcpy(sortedSums, rowSums, sizeof(unsigned long long int)*ROW_COUNT);
// Some debugging output:
/*
printf("Uncopied Sums:\n");
for(i=0; i < ROW_COUNT; i++)
printf("SortedRowSums[%d] = %llu\n", i, rowSums[i]);
printf("Memcopyed sort array:\n");
for(i=0; i < ROW_COUNT; i++)
printf("SortedRowSums[%d] = %llu\n", i, sortedSums[i]);
*/
clock_t begin = clock();
//qsort(sortedSums, ROW_COUNT, sizeof(unsigned long long int), cmpfunc);
sortUlliArray(sortedSums, ROW_COUNT);
clock_t end = clock();
double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
printf("Time for sort: %lf\n", time_spent);
printf("Before sort array:\n");
for(i=0; i<ROW_COUNT; i++){
for(j=0; j < ROW_LENGTH; j++){
printf("Unsorted[%d][%d] = %d\n", i, j, array[i][j]);
}
}
printf("Values of sorted computed keys:\n");
for(i=0; i < ROW_COUNT; i++)
printf("SortedRowSums[%d] = %llu\n", i, sortedSums[i]);
// Unpack:
for(i=0; i < ROW_COUNT; i++){
for(j=0; j < ROW_LENGTH; j++){
unsigned long long int iScalar=1;
for(k=ROW_LENGTH-1; k > j; --k)
iScalar *= 100;
unsigned long long int removalAmount = sortedSums[i]/iScalar;
decodedResultsArray[i][j] = removalAmount;
sortedSums[i] -= (removalAmount*iScalar);
// DEBUG:
// printf("Decoded Result for decoded[%d][%d] = %d\n", i, j, decodedResultsArray[i][j]);
}
}
printf("\nFinal Output:\n");
for(i=0; i < ROW_COUNT; i++){
printf("Row #%d: %d", i, decodedResultsArray[i][0]);
for(j=1; j < ROW_LENGTH; j++){
printf(", %d", decodedResultsArray[i][j]);
}
puts("");
}
fflush(stdout);
free(rowSums);
free(sortedSums);
return 1;
}
Do note, this is not all optimized for the maximum efficiency and it is littered with debug output statements, but nonetheless, it's a proof of concept on how the packing can work. Also, given the number of rows you have to handle, you will probably be better to use qsort()
, but I have it using sortUlliArray(...)
(which is a modified version of the Insert-sort function from this * answer). You'll have to give it a test to see what performs best for your case.
请注意,这并不是为了实现最高效率而进行了优化,而是充满了调试输出语句,但它仍然是包装如何工作的概念证明。另外,考虑到你必须处理的行数,你可能会更好地使用qsort(),但我使用sortUlliArray(...)(这是*回答中Insert-sort函数的修改版本) )。你必须给它一个测试,看看哪种方法最适合你的情况。
All in all, the final output from running this code on the 15 hardcoded rows is:
总而言之,在15个硬编码行上运行此代码的最终输出是:
Row #0: 1, 2, 3, 4, 5, 6
Row #1: 1, 4, 5, 6, 7, 8
Row #2: 1, 4, 5, 6, 7, 9
Row #3: 2, 12, 16, 21, 32, 40
Row #4: 3, 18, 19, 20, 25, 34
Row #5: 4, 6, 31, 36, 37, 44
Row #6: 5, 10, 34, 40, 41, 49
Row #7: 5, 19, 20, 37, 53, 57
Row #8: 6, 8, 9, 10, 13, 15
Row #9: 6, 17, 19, 20, 22, 46
Row #10: 8, 11, 25, 35, 36, 45
Row #11: 12, 19, 20, 30, 47, 53
Row #12: 16, 23, 26, 36, 42, 50
Row #13: 16, 25, 30, 48, 56, 59
Row #14: 21, 28, 31, 48, 52, 57
So, this does appear to handle the cases where the numbers are very similar, which was an issue attributable to the order the numbers were packed in.
因此,这似乎可以处理数字非常相似的情况,这是一个可归因于数字包装顺序的问题。
Anyway, the code above should work, but it is meant as an example, so I will leave it to you to apply necessary optimizations.
无论如何,上面的代码应该可行,但它只是作为一个例子,所以我将留给你应用必要的优化。
Code was tested on a MacBook Air 64-bit with 1.6 GHz Intel Core i5 and 4 GB 1600 MHz DDR3. So, a pretty weak CPU and slow memory, but it was able to do the sort for the 15 rows 0.004 milliseconds, so fairly fast, in my opinion. (That is just a measure of the sort function's speed for the above test case, not for the pre-packing or unpacking speeds, as those could use some optimization.)
代码在具有1.6 GHz Intel Core i5和4 GB 1600 MHz DDR3的MacBook Air 64位上进行了测试。所以,一个非常弱的CPU和缓慢的内存,但它能够对这15行进行排序0.004毫秒,在我看来相当快。 (这只是上述测试用例的排序函数速度的度量,而不是预打包或解包速度,因为它们可以使用一些优化。)
Major credit goes to Doynax and Jonathan Leffler.
主要归功于Doynax和Jonathan Leffler。