合并N个有序链表与FQ公平调度

时间:2021-10-15 03:51:39

下大雨了,于是就想表达一些只有下雨才能表达的东西。夜半酒酣惊觉起,使我流泪忆江南…前天晚上下班带着小小在暴雨中狂奔,非常舒服,其实也算是流言终结者吧。反驳一下几千年来在我国北方通过长辈代代相传的淋雨和感冒之间的因果关系。

  昨天早上很早起来,听雨作文,今天早上继续,文章不算太长。

合并两个有序链表

这是一道超级常见的课后作业题或者面试题,网上答案一搜一箩筐,我自己也写了一个不会编程版
有序链表合并C语言递归版–我稍微会一点编程https://blog.csdn.net/dog250/article/details/80154795
写是写出来了,但写得不好。不管怎么说也算是一个答案。

  但是,这个问题有什么实际意义呢?或者说它是不是仅仅就是一个单纯考查你对编程技法掌握程度的问题呢?类似我们大学教科书课后的习题?用“*”号打印个菱形?写个冒泡排序?

  其实并不是。在看一个实际问题之前,我们先暂时抛开代码实现,来看一下所谓的合并两个有序链表观感上到底需要做什么。这个问题相信一个完全不懂IT技术的人也能给出答案。

  如下图,两个有序链表,假设均为升序链表:

合并N个有序链表与FQ公平调度

所谓的合并,其实就是做下面的事:

合并N个有序链表与FQ公平调度

就是这么一个简单的过程,在出现大小交错的锚点处,断开旧链接,接入新链接,仅此而已,剩下的问题就是如何用Java,C/C++这种语言来表达了,事实证明,用编程语言表达它并不比用自然语言表达它更困难。

  好的,原理就这么简单,当你抛开代码的时候,才会知道它到底有什么用。现在来看一个实际问题

Linux FQ实现

Linux内核在3.12版本引入了FQ这个Schedule,它的实现参照我的这篇文章:
Linux FQ 队列实现原理浅析https://blog.csdn.net/dog250/article/details/80025939

其实,你有没有发现,FQ的dequeue例程要做的就是把N个流所代表的N个有序链表进行合并并输出的过程,只不过这个过程是动态的!我来解释一下。

  在FQ中,每一个socket代表一个流,系统为每一个流维护了一个发送队列,该队列可以视为一个skb作为元素,按照该socket指示的pacing确定的发送时间升序排列的有序链表,系统中有N个socket,就有N条该类型链表:

合并N个有序链表与FQ公平调度

注意,每一个链表的时间序都是按照skb在该流的上的相对发送时间排序的,纵然软件层面可以维护很多的队列,网卡却只有一张,网线只有一条,每个时刻只能有一个skb可以被发送,所以,所有的队列中的所有的数据包还需要基于绝对发送时间进行排序,我们按照绝对发送时间展开上图,即将多个链表映射到同一个排序维度(简略版):

合并N个有序链表与FQ公平调度

然后绝对发送序列就很明显了:

合并N个有序链表与FQ公平调度

这其实就是一个典型的合并N个有序链表的实例!合并后的结果完全可以有足够的信息指示数据包调度系统什么时间做什么事

  Linux FQ的实现使用了红黑树来定义锚点,即所有的链表中下一个要check的节点都被维护在一棵红黑树中,这样做的目的是高效取出离当前节点最近的锚点进行check。

合并N个有序链表的基本思想

很简单:

  1. 尽量保持在当前链表向后遍历,直到当前链表下一个节点的key大于其它链表最小锚点的key;
  2. 将当前节点的下一个节点作为一个锚点插入红黑树,从该最小锚点开始继续遍历,重复第1步。

这是一个递归实现的好场所!

合并N个有序链表的C语言实现

不得不说这其实就是Linux FQ实现的抽象版本了,因此必然也是用红黑树来维护锚点咯…

  我并没有自己从零开始实现一版红黑树,估计我也写不出来…我直接用了github上别人的实现,我使用的是这一版:
https://github.com/manuscola/rbtree
值得注意的是,这一版红黑树实现是不允许插入键值相同的元素的,为此要稍微做一点点修改,使得重复键值元素可以插入,这样代码实现起来按照统一的方式处理会更加清爽简单些。

  代码如下所示:

#include <stdio.h>
#include <stdlib.h>
#include"rbtree.h"

int a[] = {1,3,7,8,10,11,12,15,19,21,22,24,25,26};
int b[] = {0,4,5,6,9,16,17,18,27,30,31,32};
int c[] = {13,14,20,23,28,29};
int d[] = {2,33,100};

struct list {
        struct list *next;
        struct rbtree_node *node;
        int v;
};

int compare(void* key_a,void* key_b)
{
        int v_a = *(int *) (key_a);
        int v_b = *(int *) (key_b);
        if (v_a > v_b) {
                return 1;
        } else if (v_a == v_b) {
                return 0;
        }

        return -1;
}

void merge(struct list *iter, struct rbtree* tree)
{
        struct list *last = iter, *base = NULL;
        struct rbtree_node *base_node = rbtree_min(tree);
    // 取出最小的锚点并从红黑树中将其删除
        if (base_node) {
                base = (struct list *)base_node->data;
                __rbtree_remove(base_node,tree);
        }


        if (!iter) {
                return;
        }
        iter = iter->next;
    // 一直遍历,直到next大于锚点
        while (iter && (!base || iter->v < base->v)) {
                last = iter;
                iter = iter->next;
        }
    // 将next作为新的锚点加入红黑树
        if (last->next) {
                rbtree_insert(tree, &last->next->v, last->next);
        }
    // 递归重复
        last->next = base;
        merge(base, tree);
}

int main(int argc, char **argv)
{
        int i = 0;
        struct list *al = (struct list*)calloc(14, sizeof(*al));
        struct list *bl = (struct list*)calloc(12, sizeof(*bl));
        struct list *cl = (struct list*)calloc(6, sizeof(*cl));
        struct list *dl = (struct list*)calloc(3, sizeof(*dl));
        struct list *itera;
        struct list *iterb;
        struct list *iterc;
        struct list *iterd;
        struct list *base;

        struct rbtree* tree = rbtree_init(compare);

        // create 3 ordered-lists
        {
                for (i = 0; i < 14; i++) {
                        itera = &al[i];
                        itera->v = a[i];
                        itera->next = &al[i+1];
                        if (i == 0) {
                                rbtree_insert(tree, &itera->v, itera);
                        }
                }
                itera->next = NULL;

                for (i = 0; i < 12; i++) {
                        iterb = &bl[i];
                        iterb->v = b[i];
                        iterb->next = &bl[i+1];
                        if (i == 0) {
                                rbtree_insert(tree, &iterb->v, iterb);
                        }
                }
                iterb->next = NULL;

                for (i = 0; i < 6; i++) {
                        iterc = &cl[i];
                        iterc->v = c[i];
                        iterc->next = &cl[i+1];
                        if (i == 0) {
                                rbtree_insert(tree, &iterc->v, iterc);
                        }
                }
                iterc->next = NULL;

                for (i = 0; i < 3; i++) {
                        iterd = &dl[i];
                        iterd->v = d[i];
                        iterd->next = &dl[i+1];
                        if (i == 0) {
                                rbtree_insert(tree, &iterd->v, iterd);
                        }
                }
                iterd->next = NULL;
        }

        // merge n sorted-lists
        {
                struct rbtree_node *node;

                node = rbtree_min(tree);
                __rbtree_remove(node,tree);

                base = (struct list *)node->data;
                merge(base, tree);
        }

        // print the result
        {// 注意,从base开始遍历
                for (; base; base=base->next) {
                        printf("%d %p\n", base->v, base);
                }
        }
}

注意,之所以选择红黑树而不是最小堆,是因为红黑树的cache亲和性要比最小堆好!在红黑树的旋转过程中,最大限度地在树的平衡性以及节点的局部性之间取得了比较好的折中,这点是我看中的。此外,Linux FQ以及CFS等大量使用了红黑树,见贤思齐,所以我也就用了。

总结

本文用Linux FQ包调度机制解释了合并N个有序链表到底有什么用,在该例子中,其实实施的就是将N个相对时间序列整合到一个绝对时间序列的过程,这是一个典型的案例。

  除此之外,链表合并在批处理任务调度系统中也有非常多的应用,和Linux FQ一样,其实也是一个N到1整合的过程。链表合并并不单单是一道面试题,一道课后作业,它确确实实是一把真刀一杆真枪啊!