linux内核网络协议栈学习笔记(3)

时间:2021-12-11 11:08:27

这篇主题是内核二层包的接受发送,先来看接收:


首先关注下几个状态值

__QUEUE_STATE_FROZEN:发送队列被锁

__QUEUE_STATE_XOFF:发送队列发送功能关闭

__LINK_STATE_START:设备是否开启

__LINK_STATE_PRESENT:设备是否存在,如驱动未安装就不存在

__LINK_STATE_NOCARRIER:设备是否接收到载波,一般用来判断网线有没有接上

__LINK_STATE_DORMANT:设备是否处于休眠状态

__LINK_STATE_LINKWATCH_PENDING:设备通知链上是否有pending事件


netif_running基于__LINK_STATE_PRESENT判断设备是否运行中,

netif_tx_stop_queue,设置__QUEUE_STATE_XOFF标记,使得队列无法发送

netif_tx_start_queue,清除__QUEUE_STATE_XOFF标记,允许队列发送

netif_tx_wake_queue,清除__QUEUE_STATE_XOFF标记,调度设备qdisc,实际就是设置softnet_data的output_queue为设备的qdisc,然后触发NET_TX_SOFTIRQ软中断。netif_schedule_queue与此类似


NAPI_STATE_SCHED:NAPI已启用

NAPI_STATE_DISABLE:NAPI已禁用


OK,言归正传,目前内核有两种接受数据包的方式:硬中断,NAPI,早期的网卡不支持NAPI,只能通过硬中断通知内核,然后调用NET_RX_ACTION软中断来处理。目前的网卡驱动基本都已经支持NAPI了,一般会在驱动程序的硬中断处理例程里调用 napi_schedule,napi_schedule把一个代表设备的napi_struct结构加到当前CPU的softnet_data->poll_list中,然后直接触发软中断,而在软中断里调用驱动提供的poll函数处理网络包,具体可以参考:igb驱动的ixgbe_intr函数, bnx2驱动的bnx2_interrupt函数


以bnx2为例,软中断会调用napi_struct里的虚函数poll,而这个函数指针就指向bnx2_poll,poll 函数会一直尝试读取skb包,直到队列为空为止,这时就调用napi_complete退出NAPI模式,而对应的napi_struct结构都是驱动的内部数据结构,如ixgbe使用的napi_struct实际上是ixgbe_q_vector的成员项


而老式的不支持NAPI的网卡驱动,会调用内核提供的netif_rx函数,netif_rx的实现简要说明如下:

如果RPS/RFS指定了队列对应的CPU,通过get_rps_cpu得到,否则用smp_processor_id得到当前CPU

调用enqueue_to_backlog , 首先拿到CPU对应的softnet_data结构,暂时关闭硬中断,对softnet_data的 input_pkt_queue这个skb list 加锁,然后判断input_pkt_queue的个数是否已经超过了netdev_max_backlog(一般是300),如果超过,会把包drop掉,解锁,恢复硬中断

下面判断softnet_data->input_pkt_queue长度是否为0,为0说明有可能软中断被停了,这时如果softnet_data对应的CPU不是本CPU,直接raise软中断,否则再多一步,把softnet_data->backlog这个napi_struct结构加到softnet_data的poll_list里

如果长度不为0,把skb加到softnet_data->input_pkt_queue的末尾,解锁,恢复硬中断


各位可以看到我反复提到了两个数据结构,napi_struct和softnet_data,这两个算是网卡收包的核心数据结构

/*
 * Incoming packets are placed on per-cpu queues so that
 * no locking is needed.
 */
struct softnet_data
{
    struct Qdisc        *output_queue;
    struct list_head    poll_list;
    struct sk_buff      *completion_queue;


    /* Elements below can be accessed between CPUs for RPS */
    struct call_single_data csd ____cacheline_aligned_in_smp;
    unsigned int            input_queue_head;
    struct sk_buff_head input_pkt_queue;
    struct napi_struct  backlog;
};

softnet_data 是一个per_cpu结构,包含了每个核上对应的发送和接收队列,以及对应的网卡设备

struct Qdisc* ouptput_queue:发送队列规则,发送的时候会用设备的Qdisc

struct list_head poll_list:在CPU上等待被轮询接收包的网络设备列表

struct sk_buff* completion_queue:发送完成的skb被放到这个队列等待被回收

struct sk_buff_head input_pkt_queue:所有非NAPI的设备会共用这个队列,我们在netif_rx的代码中可以看到,所有非NAPI的设备驱动调用了netif_rx的结果就是把skb插入到input_pkt_queue的末尾

struct napi_struct backlog:所有非NAPI的设备会共用这个napi_struct,因为这些驱动没有自己的napi_struct结构,netif_rx中会调用____napi_schedule把backlog加到softnet_data的poll_list设备列表里


struct napi_struct {
    /* The poll_list must only be managed by the entity which
     * changes the state of the NAPI_STATE_SCHED bit.  This means
     * whoever atomically sets that bit can add this napi_struct
     * to the per-cpu poll_list, and whoever clears that bit
     * can remove from the list right before clearing the bit.
     */          
    struct list_head    poll_list;

    unsigned long       state;
    int         weight;
    int         (*poll)(struct napi_struct *, int);
#ifdef CONFIG_NETPOLL
    spinlock_t      poll_lock;
    int         poll_owner;
#endif

    unsigned int        gro_count;

    struct net_device   *dev;
    struct list_head    dev_list;
    struct sk_buff      *gro_list;
    struct sk_buff      *skb;
};

napi_struct结构代表了有包需要被轮询的设备,softnet_data的poll_list是这个列表头,之后就是这个napi_struct设备列表

poll是虚函数指针,对于非NAPI驱动而言,是process_backlog

state表示NAPI是否被启动,要么是 NAPI_STATE_SCHED,或者 NAPI_STATE_DISABLE

struct net_device* dev 指向了对应的设备结构


下面来看net_rx_action软中断处理例程的实现:

首先得到当前CPU的softnet_data, softnet_data->poll_list,下面短暂的关闭硬中断,查看softnet_data->poll_list是否有设备还有包未收完,如果没了那么打开硬中断返回

如果poll_list还有设备,如果这次软中断处理例程已经超过来2个jiffy,或者用完来netdev_budget,那么这次结束,重新raise一个软中断,打开硬中断后返回

否则的话,开始正式处理包,这时已经可以打开硬中断了,OK,通过poll_list的list_head得到对应的napi_struct结构,然后调用napi_struct->poll函数把包交给上层协议栈处理,poll函数的第二个参数是一个quota,表示一次poll可以处理的最多包的个数,如果poll返回后得知处理的包已经到来quota,说明这个设备里的包这次poll还没处理完,但下面就要转到下一个设备中了,怎么办?如果此时NAPI没有被DISABLE,那么把这个设备重新加到poll_list末尾,否则调用napi_complete,移除该设备,此时napi_struct->state的 NAPI_STATE_SCHED会被清除


最后来看下process_backlog函数,所有非NAPI设备都会调用该函数来接收设备等待轮询的包:

首先通过container_of宏,从struct napi_struct* backlog 得到对应的 struct softnet_data,然后开始循环处理skb,直到过去一个jiffy,或者处理的包的个数超过了quota

处理的过程很简单,对input_pkt_queue加锁,调用__skb_dequeue得到一个skb,如果skb为空,说明input_pkt_queue已经空了,这时可以删除整个poll_list列表,清掉所有napi_struct的state,解锁后返回,否则继续循环下去



下面是发送的专题:

发送队列有如下两个状态:

__QUEUE_STATE_XOFF

__QUEUE_STATE_FROZEN


netif_start_queue就是用来打开发送队列,里面调用了netif_tx_start_queue(netdev_get_tx_queue(dev, 0)),打开了net_device->_tx中的第0个发送队列,_tx的定义如下

struct netdev_queue *_tx ____cacheline_aligned_in_smp

内核为发送队列定义了结构struct netdev_queue

struct netdev_queue {
/* 
 * read mostly part
 */ 
    struct net_device   *dev;
    struct Qdisc        *qdisc;
    unsigned long       state;
    struct Qdisc        *qdisc_sleeping;
/*  
 * write mostly part    
 */ 
    spinlock_t      _xmit_lock ____cacheline_aligned_in_smp;
    int         xmit_lock_owner;
    /*
     * please use this field instead of dev->trans_start
     */
    unsigned long       trans_start;
    unsigned long       tx_bytes;
    unsigned long       tx_packets;
    unsigned long       tx_dropped;
} ____cacheline_aligned_in_smp;

我们可以在XPS的patch中看到_tx的用法,_tx是发送队列的数组,alloc_netdev_mqs中传入了txqs, rxqs,代表初始发送,接收队列的个数,这个函数由alloc_netdev调用

alloc_netdev_mqs 首先计算一个alloc_size,这个size是由sizeof(struct net_device)和一个private size组成,private size代表了netdevice extension的大小,调用kzalloc分配好net_device和extension的大小(extension一般留给驱动去用)

下面开始初始化发送队列,把txqs赋值给dev->num_tx_queues,dev->real_num_tx_queues,调用netif_alloc_netdev_queues给_tx数组分配多个发送队列netdev_queue,每个_tx[i]->dev都指向net_device设备。

之后初始化接收队列,把rxqs赋值给dev->num_rx_queues, dev->real_num_rx_queues,调用netif_alloc_rx_queues初始化接收队列netdev_rx_queue

/* This structure contains an instance of an RX queue. */
struct netdev_rx_queue {
    struct rps_map *rps_map;
    struct rps_dev_flow_table *rps_flow_table;
    struct kobject kobj;
    struct net_device *dev;
} ____cacheline_aligned_in_smp;

和接收类似,发送也有如下队列控制函数

netif_tx_start_queue:开启发送队列,即清除__QUEUE_STATE_XOFF标志

netif_tx_stop_queue:关闭发送队列,即设置__QUEUE_STATE_XOFF标志

netif_tx_wake_queue:唤醒发送队列,首先清除__QUEUE_STATE_XOFF,后调用__netif_schedule(),该函数把netdev_queue->qdisc赋值给softnet_data->output_queue,然后raise软中断发送


Qdisc的状态码如下:

__QDISC_STATE_RUNNING:qdisc正在运行

__QDISC_STATE_SCHED:qdisc等待被调度运行

__QDISC_STATE_DEACTIVATED:qdisc不可用


qdisc发送函数qdisc_run,会检查并设置__QDISC_STATE_RUNNING状态,如果之前不是__QDISC_STATE_RUNNING,调用__qdisc_run

__qdisc_run是一个死循环,反复调用qdisc_restart,直到有其他进程需要CPU,或者运行时间超过了一个jiffy,这时会调__netif_schedule把qdisc状态设置为__QDISC_STATE_SCHED,然后调用__netif_reschedule把当前qdisc加到当前CPU softnet_data->output_queue的链表头部,触发net_tx_action软中断等待被CPU调度执行

qdisc_restart做的事情,就是把队列里的skb想办法发出去,首先通过dequeue_skb拿到第一个skb,然后调用sch_direct_xmit

sch_direct_xmit 首先要拿到 netdev_queue->_xmit_lock,前提是NETIF_F_LLTX没被设置(这个标记表示可以无锁发送,因此也就不需要获得什么锁了),下面调用dev_hard_start_xmit,让驱动去发送这个skb包

dev_hard_start_xmit 首先看是否需要做gso,如果需要gso,那么调用dev_gso_segment,里面调用skb_gso_segment把单个的skb分片为多个skb segment,形成一个skb链表,然后对每个skb分片,调用驱动程序的ndo_start_xmit把包发出去。 不需要gso的话,也是调用驱动的ndo_start_xmit发包

sch_direct_xmit 判断 dev_hard_start_xmit 的返回值

NETDEV_TX_OK,此时不会释放skb,而是通过NET_TX_ACTION软中断来做清理

NETDEV_TX_BUSY,调用dev_requeue_skb把skb重新放入qdisc队列,重新调度软中断发送

NETDEV_TX_LOCKED,表示有另外的CPU拿到了驱动发送包的锁,这时记录下一次collision之后,调用dev_requeue_skb把skb重新放入qdisc队列


struct Qdisc
{
    int             (*enqueue)(struct sk_buff *skb, struct Qdisc *dev);
    struct sk_buff *    (*dequeue)(struct Qdisc *dev);
    unsigned        flags;
#define TCQ_F_BUILTIN       1
#define TCQ_F_THROTTLED     2
#define TCQ_F_INGRESS       4
#define TCQ_F_CAN_BYPASS    8
#define TCQ_F_MQROOT        16
#define TCQ_F_WARN_NONWC    (1 << 16)
    int         padded;
    struct Qdisc_ops    *ops;
    struct qdisc_size_table *stab;
    struct list_head    list;
    u32         handle;
    u32         parent;
    atomic_t        refcnt;
    struct gnet_stats_rate_est  rate_est;
    int         (*reshape_fail)(struct sk_buff *skb,
                    struct Qdisc *q);

    void            *u32_node;

    /* This field is deprecated, but it is still used by CBQ
     * and it will live until better solution will be invented.
     */
    struct Qdisc        *__parent;
    struct netdev_queue *dev_queue;
    struct Qdisc        *next_sched;

    struct sk_buff      *gso_skb;
    /*
     * For performance sake on SMP, we put highly modified fields at the end
     */
    unsigned long       state;
    struct sk_buff_head q;
    struct gnet_stats_basic_packed bstats;
    struct gnet_stats_queue qstats;
};


dev_queue_xmit是驱动发送包时调用的函数:如果skb是分片,调用__skb_linearize合成大块的skb;如果checksum不对,直接drop;如果skb是分片但设备不支持NETIF_F_FRAGLIST,也drop;之后如果qdisc的enqueue函数不为空,说明是有队列的设备,调用__dev_xmit_skb发送包,其过程也是调用了sch_direct_xmit,__qdisc_run的函数来完成的

无队列设备这里跳过了


最后来看下发送软中断NET_TX_ACTION做的事情:

net_tx_action 函数做两件事:

static void net_tx_action(struct softirq_action *h)

    struct softnet_data *sd = &__get_cpu_var(softnet_data);

    if (sd->completion_queue) {
        struct sk_buff *clist;

        local_irq_disable();
        clist = sd->completion_queue;
        sd->completion_queue = NULL;
        local_irq_enable();

        while (clist) {
            struct sk_buff *skb = clist;
            clist = clist->next;

            WARN_ON(atomic_read(&skb->users));
            trace_kfree_skb(skb, net_tx_action);
            __kfree_skb(skb);
        }
    }

...

首先判断softnet_data里的completion_queue是否为空,对于发送而言,硬中断只是通过网卡把包发走,但是回收内存的事情是通过软中断来做的,设备驱动发送完数据之后,会调用dev_kfree_skb_irq,该函数代码如下:
void dev_kfree_skb_irq(struct sk_buff *skb)
{           
    if (atomic_dec_and_test(&skb->users)) {
        struct softnet_data *sd;
        unsigned long flags;
       
        local_irq_save(flags);
        sd = &__get_cpu_var(softnet_data);
        skb->next = sd->completion_queue;
        sd->completion_queue = skb;
        raise_softirq_irqoff(NET_TX_SOFTIRQ);
        local_irq_restore(flags);
    }
}

可以看到dev_kfree_skb_irq只是把发送完成的skb加到softnet_data->completion_queue里面,然后触发NET_TX_SOFTIRQ软中断来回收

net_tx_softirq的第一件事就是回收这些skb,首先禁止硬中断拿到softnet_data->completion_queue的skb链表,恢复硬中断,对于链表里的每一个skb,调用__kfree_skb


net_tx_action做的第二件事是把未传输完成的skb发送完,这种场景对应了netif_wake_tx_queue唤醒之前因某种原因休眠了的队列,netif_wake_tx_queue唤醒了队列(其实就是设置了__QDISC_STATE_RUNNING标志位)之后,会触发一个net_tx_action来做这件事,代码如下:

    if (sd->output_queue) {
        struct Qdisc *head;

        local_irq_disable();
        head = sd->output_queue;
        sd->output_queue = NULL;
        local_irq_enable();

        while (head) {
            struct Qdisc *q = head;
            spinlock_t *root_lock;

            head = head->next_sched;

            root_lock = qdisc_lock(q);
            if (spin_trylock(root_lock)) {
                smp_mb__before_clear_bit();
                clear_bit(__QDISC_STATE_SCHED,
                      &q->state);
                qdisc_run(q);
                spin_unlock(root_lock);
            } else {
                if (!test_bit(__QDISC_STATE_DEACTIVATED,
                          &q->state)) {
                    __netif_reschedule(q);
                } else {
                    smp_mb__before_clear_bit();
                    clear_bit(__QDISC_STATE_SCHED,
                          &q->state);
                }
            }
        }
    }
}

首先禁止/恢复硬中断拿到softnet_data->output_queue,对于output_queue里的每一个Qdisc,通过qdisc_run发送包


事实上,并不是每个驱动都会这么做,在bnx2的bnx2_start_xmit函数中,并没有调用dev_kfree_skb_irq,而是直接调用dev_kfree_skb,但当驱动的发送环空间不够时,会依次调用netif_tx_stop_queue,smp_mb,当bnx2的发送环空间超过了唤醒阀值 tx_wake_thresh 后,调用 netif_tx_wake_queue


关于dev_queue_xmit,这篇blog的注解也不错  http://blog.csdn.net/peimichael/article/details/4699609