Linux时间子系统之五:低分辨率定时器的原理和实现

时间:2022-10-12 02:15:13

专题文档汇总目录

Notes:低精度timer在内核中的数据结构以及API接口;低精度timer精巧高效的分组,使用cascade进行定时器移位,组内Timer FIFO;低精度Timer的初始化流程。

原文地址:Linux时间子系统之五:低分辨率定时器的原理和实现

利用定时器,我们可以设定在未来的某一时刻,触发一个特定的事件。所谓低分辨率定时器,是指这种定时器的计时单位基于jiffies值的计数,也就是说,它的精度只有1/HZ,假如你的内核配置的HZ是1000,那意味着系统中的低分辨率定时器的精度就是1ms。早期的内核版本中,内核并不支持高精度定时器,理所当然只能使用这种低分辨率定时器,我们有时候把这种基于HZ的定时器机制成为时间轮:time wheel。虽然后来出现了高分辨率定时器,但它只是内核的一个可选配置项,所以直到目前最新的内核版本,这种低分辨率定时器依然被大量地使用着。
/*****************************************************************************************************/
声明:本博内容均由http://blog.csdn.NET/droidphone原创,转载请注明出处,谢谢!
/*****************************************************************************************************/

1.  定时器的使用方法

在讨论定时器的实现原理之前,我们先看看如何使用定时器。要在内核编程中使用定时器,首先我们要定义一个time_list结构,该结构在include/Linux/timer.h中定义:

  1. struct timer_list {
  2. /*
  3. * All fields that change during normal runtime grouped to the
  4. * same cacheline
  5. */
  6. struct list_head entry;
  7. unsigned long expires;
  8. struct tvec_base *base;
  9. void (*function)(unsigned long);
  10. unsigned long data;
  11. int slack;
  12. ......
  13. };

entry  字段用于把一组定时器组成一个链表,至于内核如何对定时器进行分组,我们会在后面进行解释。

expires  字段指出了该定时器的到期时刻,也就是期望定时器到期时刻的jiffies计数值。

base  每个cpu拥有一个自己的用于管理定时器的tvec_base结构,该字段指向该定时器所属的cpu所对应tvec_base结构。

function  字段是一个函数指针,定时器到期时,系统将会调用该回调函数,用于响应该定时器的到期事件。

data  该字段用于上述回调函数的参数。

slack 对有些对到期时间精度不太敏感的定时器,到期时刻允许适当地延迟一小段时间,该字段用于计算每次延迟的HZ数。

Notes:

定时器的初始化有静态和动态两种方法。

静态初始化通过DEFINE_TIMER宏定义,创建一个结构体变量:

#define DEFINE_TIMER(_name, _function, _expires, _data) \
  struct timer_list _name = \
    TIMER_INITIALIZER(_function, _expires, _data)

#define TIMER_INITIALIZER(_function, _expires, _data) { \
  .entry = { .prev = TIMER_ENTRY_STATIC }, \
  .function = (_function), \
  .expires = (_expires), \
  .data = (_data), \
  .base = &boot_tvec_bases, \
  .slack = -1, \
  __TIMER_LOCKDEP_MAP_INITIALIZER( \
    __FILE__ ":" __stringify(__LINE__)) \
}

动态初始化是在函数中对timer_list成员配置。

struct timer_list timer;---------------结构体变量

init_timer(&timer);--------------------初始化出expires/function/data之外的结构体成员

timer.expires = expires;--------------超时值

timer.fuction = function;-------------超时函数

timer.data = data;---------------------超时函数参数

一个定时其完整周期处理init,还包括add-mod-delete。

void add_timer(struct timer_list *timer);----激活一个定时器

int mod_timer(struct timer_list *timer, unsigned long expires);----修改到期定时器

int del_timer(struct timer_list * timer);----删除定时器

定时器系统还提供了以下这些API供我们使用:

  • void add_timer_on(struct timer_list *timer, int cpu);  // 在指定的cpu上添加定时器
  • int mod_timer_pending(struct timer_list *timer, unsigned long expires);  //  只有当timer已经处在激活状态时,才修改timer的到期时刻
  • int mod_timer_pinned(struct timer_list *timer, unsigned long expires);  //  当
  • void set_timer_slack(struct timer_list *time, int slack_hz);  //  设定timer允许的到期时刻的最大延迟,用于对精度不敏感的定时器
  • int del_timer_sync(struct timer_list *timer);  //  如果该timer正在被处理中,则等待timer处理完成才移除该timer

2.  定时器的软件架构

低分辨率定时器是基于HZ来实现的,也就是说,每个tick周期,都有可能有定时器到期,关于tick如何产生,请参考:Linux时间子系统之四:定时器的引擎:clock_event_device。系统中有可能有成百上千个定时器,难道在每个tick中断中遍历一下所有的定时器,检查它们是否到期?内核当然不会使用这么笨的办法,它使用了一个更聪明的办法:按定时器的到期时间对定时器进行分组。因为目前的多核处理器使用越来越广泛,连智能手机的处理器动不动就是4核心,内核对多核处理器有较好的支持,低分辨率定时器在实现时也充分地考虑了多核处理器的支持和优化。为了较好地利用cache line,也为了避免cpu之间的互锁,内核为多核处理器中的每个cpu单独分配了管理定时器的相关数据结构和资源,每个cpu独立地管理属于自己的定时器。

2.1  定时器的分组

首先,内核为每个cpu定义了一个tvec_base结构指针:

static DEFINE_PER_CPU(struct tvec_base *, tvec_bases) = &boot_tvec_bases;

tvec_base结构的定义如下:

struct tvec_base {
  spinlock_t lock;
  struct timer_list *running_timer;
  unsigned long timer_jiffies;------------------大多数情况下即是jiffies
  unsigned long next_timer;-------------------下一个即将到期的定时器expires
  struct tvec_root tv1;--------------------------和其他定时器分组不同,tvec_root
  struct tvec tv2;
  struct tvec tv3;
  struct tvec tv4;
  struct tvec tv5;
} ____cacheline_aligned;

running_timer  该字段指向当前cpu正在处理的定时器所对应的timer_list结构。

timer_jiffies  该字段表示当前cpu定时器所经历过的jiffies数,大多数情况下,该值和jiffies计数值相等,当cpu的idle状态连续持续了多个jiffies时间时,当退出idle状态时,jiffies计数值就会大于该字段,在接下来的tick中断后,定时器系统会让该字段的值追赶上jiffies值。

next_timer  该字段指向该cpu下一个即将到期的定时器。

tv1--tv5 这5个字段用于对定时器进行分组,实际上,tv1--tv5都是一个链表数组,其中tv1的数组大小为TVR_SIZE, tv2 tv3 tv4 tv5的数组大小为TVN_SIZE,根据CONFIG_BASE_SMALL配置项的不同,它们有不同的大小:

  1. #define TVN_BITS (CONFIG_BASE_SMALL ? 4 : 6)
  2. #define TVR_BITS (CONFIG_BASE_SMALL ? 6 : 8)
  3. #define TVN_SIZE (1 << TVN_BITS)
  4. #define TVR_SIZE (1 << TVR_BITS)
  5. #define TVN_MASK (TVN_SIZE - 1)
  6. #define TVR_MASK (TVR_SIZE - 1)
  7. struct tvec {
  8. struct list_head vec[TVN_SIZE];
  9. };
  10. struct tvec_root {
  11. struct list_head vec[TVR_SIZE];
  12. };

默认情况下,没有使能CONFIG_BASE_SMALL,TVR_SIZE的大小是256,TVN_SIZE的大小则是64,当需要节省内存空间时,也可以使能CONFIG_BASE_SMALL,这时TVR_SIZE的大小是64,TVN_SIZE的大小则是16,以下的讨论我都是基于没有使能CONFIG_BASE_SMALL的情况。当有一个新的定时器要加入时,系统根据定时器到期的jiffies值和timer_jiffies字段的差值来决定该定时器被放入tv1至tv5中的哪一个数组中,最终,系统中所有的定时器的组织结构如下图所示:
Linux时间子系统之五:低分辨率定时器的原理和实现
                                                        图 2.1.1  定时器在系统中的组织结构

2.2  定时器的添加

要加入一个新的定时器,我们可以通过api函数add_timer或mod_timer来完成,最终的工作会交由internal_add_timer函数来处理。该函数按以下步骤进行处理:

  • 计算定时器到期时间和所属cpu的tvec_base结构中的timer_jiffies字段的差值,记为idx;
  • 根据idx的值,选择该定时器应该被放到tv1--tv5中的哪一个链表数组中,可以认为tv1-tv5分别占据一个32位数的不同比特位,tv1占据最低的8位,tv2占据紧接着的6位,然后tv3再占位,以此类推,最高的6位分配给tv5。最终的选择规则如下表所示:
链表数组 idx范围
tv1 0-255(2^8)
tv2 256--16383(2^14)
tv3 16384--1048575(2^20)
tv4 1048576--67108863(2^26)
tv5 67108864--4294967295(2^32)

确定链表数组后,接着要确定把该定时器放入数组中的哪一个链表中,如果时间差idx小于256,按规则要放入tv1中,因为tv1包含了256个链表,所以可以简单地使用timer_list.expires的低8位作为数组的索引下标,把定时器链接到tv1中相应的链表中即可。如果时间差idx的值在256--18383之间,则需要把定时器放入tv2中,同样的,使用timer_list.expires的8--14位作为数组的索引下标,把定时器链接到tv2中相应的链表中,。定时器要加入tv3 tv4 tv5使用同样的原理。经过这样分组后的定时器,在后续的tick事件中,系统可以很方便地定位并取出相应的到期定时器进行处理。以上的讨论都体现在internal_add_timer的代码中:

static void internal_add_timer(struct tvec_base *base, struct timer_list *timer)
{
  unsigned long expires = timer->expires;
  unsigned long idx = expires - base->timer_jiffies;------------------也即timer->expires-base->timer_jiffies,差值范围可以大致判断属于哪一个分组。
  struct list_head *vec;

  if (idx < TVR_SIZE) {----------------------------------------------------tv1
    int i = expires & TVR_MASK;
    vec = base->tv1.vec + i;
  } else if (idx < 1 << (TVR_BITS + TVN_BITS)) {-------------------tv2
    int i = (expires >> TVR_BITS) & TVN_MASK;
    vec = base->tv2.vec + i;
  } else if (idx < 1 << (TVR_BITS + 2 * TVN_BITS)) {--------------tv3
    int i = (expires >> (TVR_BITS + TVN_BITS)) & TVN_MASK;
    vec = base->tv3.vec + i;
  } else if (idx < 1 << (TVR_BITS + 3 * TVN_BITS)) {--------------tv4
    int i = (expires >> (TVR_BITS + 2 * TVN_BITS)) & TVN_MASK;
    vec = base->tv4.vec + i;
  } else if ((signed long) idx < 0) {-------------------------------------异常处理,随机取了一个0~255之间的数值
    /*
    * Can happen if you add a timer with expires == jiffies,
    * or you set a timer to go off in the past
    */
    vec = base->tv1.vec + (base->timer_jiffies & TVR_MASK);
  } else {
    int i;
    /* If the timeout is larger than MAX_TVAL (on 64-bit
    * architectures or with CONFIG_BASE_SMALL=1) then we
    * use the maximum timeout.
    */
    if (idx > MAX_TVAL) {--------------------------------------------限定timer最大值2^32-1,假设jiffies为5ms,4294967295*5ms=248.6天
    idx = MAX_TVAL;
    expires = idx + base->timer_jiffies;
    }
    i = (expires >> (TVR_BITS + 3 * TVN_BITS)) & TVN_MASK;
    vec = base->tv5.vec + i;---------------------------------------tv5
  }
  /*
  * Timers are FIFO:
  */
  list_add_tail(&timer->entry, vec);---------------------------------相同vec的timer采取FIFO策略
}

2.2  定时器的到期处理

经过2.1节的处理后,系统中的定时器按到期时间有规律地放置在tv1--tv5各个链表数组中,其中tv1中放置着在接下来的256个jiffies即将到期的定时器列表,需要注意的是,并不是tv1.vec[0]中放置着马上到期的定时器列表,tv1.vec[1]中放置着将在jiffies+1到期的定时器列表。因为base.timer_jiffies的值一直在随着系统的运行而动态地增加,原则上是每个tick事件会加1,base.timer_jiffies代表者该cpu定时器系统当前时刻,定时器也是动态地加入头256个链表tv1中,按2.1节的讨论,定时器加入tv1中使用的下标索引是定时器到期时间expires的低8位,所以假设当前的base.timer_jiffies值是0x34567826,则马上到期的定时器是在tv1.vec[0x26]中,如果这时候系统加入一个在jiffies值0x34567828到期的定时器,他将会加入到tv1.vec[0x28]中,运行两个tick后,base.timer_jiffies的值会变为0x34567828,很显然,在每次tick事件中,定时器系统只要以base.timer_jiffies的低8位作为索引,取出tv1中相应的链表,里面正好包含了所有在该jiffies值到期的定时器列表。

Notes:8+6+6+6+6=32位

那什么时候处理tv2--tv5中的定时器?每当base.timer_jiffies的低8位为0值时,这表明base.timer_jiffies的第8-13位有进位发生,这6位正好代表着tv2,这时只要按base.timer_jiffies的第8-13位的值作为下标,移出tv2中对应的定时器链表,然后用internal_add_timer把它们从新加入到定时器系统中来,因为这些定时器一定会在接下来的256个tick期间到期,所以它们肯定会被加入到tv1数组中,这样就完成了tv2往tv1迁移的过程。同样地,当base.timer_jiffies的第8-13位为0时,这表明base.timer_jiffies的第14-19位有进位发生,这6位正好代表着tv3,按base.timer_jiffies的第14-19位的值作为下标,移出tv3中对应的定时器链表,然后用internal_add_timer把它们从新加入到定时器系统中来,显然它们会被加入到tv2中,从而完成tv3到tv2的迁移,tv4,tv5的处理可以以此作类推。具体迁移的代码如下,参数index为事先计算好的高一级tv的需要迁移的数组索引:

static int cascade(struct tvec_base *base, struct tvec *tv, int index)
{
  /* cascade all the timers from tv up one level */
  struct timer_list *timer, *tmp;
  struct list_head tv_list;

  list_replace_init(tv->vec + index, &tv_list);-------------------------用tv_list替换tv->vec+index表头,将tv->vec+index重新初始化,即为空,链表不会有timer存在。

  /*
  * We are removing _all_ timers from the list, so we
  * don't have to detach them individually.
  */
  list_for_each_entry_safe(timer, tmp, &tv_list, entry) {----------遍历刚取出的tv_list里的timer
    BUG_ON(tbase_get_base(timer->base) != base);
    internal_add_timer(base, timer);--------------------------------重新加入到定时器系统中,实际上将会迁移到下一级的tv数组中
  }

  return index;
}

每个tick事件到来时,内核会在tick定时中断处理期间激活定时器软中断:TIMER_SOFTIRQ,关于软件中断,请参考另一篇博文:Linux中断(interrupt)子系统之五:软件中断(softIRQ。TIMER_SOFTIRQ的执行函数是__run_timers,它实现了本节讨论的逻辑,取出tv1中到期的定时器,执行定时器的回调函数,由此可见,低分辨率定时器的回调函数是执行在软件中断上下文中的,这点在写定时器的回调函数时需要注意。__run_timers的代码如下:

 

static inline void __run_timers(struct tvec_base *base)
{
  struct timer_list *timer;

  spin_lock_irq(&base->lock);
  while (time_after_eq(jiffies, base->timer_jiffies)) {------------------同步jiffies,在NO_HZ情况下,base->timer_jiffies可能落后不止一个tick
    struct list_head work_list;
    struct list_head *head = &work_list;
    int index = base->timer_jiffies & TVR_MASK;---------------------计算到期定时器链表在tv1中的索引

    /*
    * Cascade timers:
    */
    if (!index &&--------------------------------------------------------index为0时,表示有进位到tv2
      (!cascade(base, &base->tv2, INDEX(0))) &&--------------返回为0时,表示有进位到tv3
        (!cascade(base, &base->tv3, INDEX(1))) &&---------返回为0时,表示有进位到tv4
          !cascade(base, &base->tv4, INDEX(2)))----------返回为0时,表示有进位到tv5
      cascade(base, &base->tv5, INDEX(3));---------------------从低位到高位逐级进位的优美,在进位时将高位timer_list插入到低位
    ++base->timer_jiffies;----------------------------------------------该cpu定时器系统运行时间递增一个tick
    list_replace_init(base->tv1.vec + index, &work_list);----------取出到期的定时器链表
    while (!list_empty(head)) {----------------------------------------遍历所有的到期定时器
      void (*fn)(unsigned long);
      unsigned long data;

      timer = list_first_entry(head, struct timer_list,entry);
      fn = timer->function;
      data = timer->data;

      timer_stats_account_timer(timer);

      base->running_timer = timer;-------------------------------标记正在处理的定时器
      detach_timer(timer, 1);

      spin_unlock_irq(&base->lock);
      call_timer_fn(timer, fn, data);--------------------------------调用定时器的回调函数
      spin_lock_irq(&base->lock);
    }
  }
  base->running_timer = NULL;
  spin_unlock_irq(&base->lock);
}

Notes:

假设当前的jiffies是0x34567826,定时器分组数据如下:

Linux时间子系统之五:低分辨率定时器的原理和实现

基于当前时间,我们假设系统中存在如下一系列定时器:

timer_1=0x34567828
timer_2=0x34567840
timer_3=0x34567928
timer_4=0x34567940
timer_5=0x34567d00
timer_6=0x34567d0f
timer_7=0x34568123
timer_8=0x3456830f
timer_9=0x34987f28
timer_a=0x3498ff28
timer_b=0x82567f28

那么它们在当前时间轮上处于什么位置呢?首先所有的timer_jiffies必须大于0x34567826。因为定时器时间轮是基于timer_jiffies来一级一级增长的,而低精度定时器的expires也是基于jiffies的,所以随着时间的流逝,expires可用范围越来越小。

结合__run_timers中的cascade,每当低一级分组进位时,得出上一级分组index,通过index就将上一级list_head取出,然后插入到本级分组。因为上一级每一个index分组内容,刚好可以填满本级分组。

所以Cascade Timers是按照index-->tv2-->tv3-->tv4-->t5顺序来判断,进位是从低位到高位传递。

Linux时间子系统之五:低分辨率定时器的原理和实现

通过上面的讨论,我们可以发现,内核的低分辨率定时器的实现非常精妙,既实现了大量定时器的管理,又实现了快速的O(1)查找到期定时器的能力,利用巧妙的数组结构,使得只需在间隔256个tick时间才处理一次迁移操作,5个数组就好比是5个齿轮,它们随着base->timer_jifffies的增长而不停地转动,每次只需处理第一个齿轮的某一个齿节,低一级的齿轮转动一圈,高一级的齿轮转动一个齿,同时自动把即将到期的定时器迁移到上一个齿轮中,所以低分辨率定时器通常又被叫做时间轮:time wheel。事实上,它的实现是一个很好的空间换时间软件算法。

3.  定时器软件中断

Notes:注册TIMER_SOFTIRQ路径是start_kernel-->init_timers-->open_softirq(TIMER_SOFTIRQ, run_timer_softirq)。

触发TIMER_SOFTIRQ的地方有几处:

1.如果当前是周期性TIck,在tick_handle_periodic-->tick_periodic-->update_process_times-->run_local_timers-->raise_softirq(TIMER_SOFTIRQ)-->run_timer_softirq-->__run_timers

2.在NOHZ模式下,tick_nohz_switch_to_nohz将clock_event_device的event_handler切换成tick_nohz_handler。

3.使用hrtimer模拟低精度timer时hrtimer_switch_to_hres-->tick_setup_sched_timer将tick_sched定时器超时函数设为tick_sched_timer。tick_sched_timer-->update_process_times。

系统初始化时,start_kernel会调用定时器系统的初始化函数init_timers:

  1. void __init init_timers(void)
  2. {
  3. int err = timer_cpu_notify(&timers_nb, (unsigned long)CPU_UP_PREPARE,
  4. (void *)(long)smp_processor_id());
  5. init_timer_stats();
  6. BUG_ON(err != NOTIFY_OK);
  7. register_cpu_notifier(&timers_nb);  /* 注册cpu notify,以便在hotplug时在cpu之间进行定时器的迁移 */
  8. open_softirq(TIMER_SOFTIRQ, run_timer_softirq);
  9. }

可见,open_softirq把run_timer_softirq注册为TIMER_SOFTIRQ的处理函数,另外,当cpu的每个tick事件到来时,在事件处理中断中,update_process_times会被调用,该函数会进一步调用run_local_timers,run_local_timers会触发TIMER_SOFTIRQ软中断:

  1. void run_local_timers(void)
  2. {
  3. hrtimer_run_queues();
  4. raise_softirq(TIMER_SOFTIRQ);
  5. }

TIMER_SOFTIRQ的处理函数是run_timer_softirq:

  1. static void run_timer_softirq(struct softirq_action *h)
  2. {
  3. struct tvec_base *base = __this_cpu_read(tvec_bases);
  4. hrtimer_run_pending();
  5. if (time_after_eq(jiffies, base->timer_jiffies))
  6. __run_timers(base);
  7. }

好啦,终于看到__run_timers函数了,2.2节已经介绍过,正是这个函数完成了对到期定时器的处理工作,也完成了时间轮的不停转动。