rt-thread的IPC机制之信号量源码分析

时间:2022-01-06 20:39:19

rt-thread操作系统的IPC(Inter-Process Communication,进程间通信)包含有信号量,互斥锁,事件,邮箱,消息队列.

本文主要针对信号量.信号量是用来解决线程同步和互斥的通用工具,和互斥量类似,信号量也可用作资源互斥访问,但信号量没有所有者的概念,在应用上比互斥量更广泛。信号量比较简单,不能解决优先级翻转问题,但信号量是一种轻量级的对象,比互斥量小巧、灵活。因此在很多对互斥要求不严格的系统中(或者不会造成优先级翻转的情况下),经常使用信号量来管理互斥资源。

1 信号量控制块

/**
* Semaphore structure
*/
struct rt_semaphore
{
struct rt_ipc_object parent; /**< inherit from ipc_object *///派生自IPC对象

rt_uint16_t value; /**< value of semaphore. *///信号量计数器
};
typedef struct rt_semaphore *rt_sem_t;

value为信号计数器,此信号量多次被释放时将会累加,在被获取时则将减1,当其值为0时,再请求获取的线程将会被挂起到挂起链表中。

parent为一rt_ipc_object即IPC对象,其定义如下:

/**
* Base structure of IPC object
*/
struct rt_ipc_object
{
struct rt_object parent; /**< inherit from rt_object *///可知其派生自内核对象

rt_list_t suspend_thread; /**< threads pended on this resource *///线程挂起链表
};

从rt_ipc_object的定义结构可知其派生自rt_object结构,即内核对象的定义(参考http://blog.csdn.net/flydream0/article/details/8568463),而其它IPC,如互斥锁,事件,邮箱,消息队列都是从rt_ipc_object派生。

另外,IPC对象还包含一挂起链表,用来保存因此IPC对象而挂起的线程.

2 信号量的创建与初始化

2.1 初始化

/**
* This function will initialize a semaphore and put it under control of
* resource management.
*
* @param sem the semaphore object
* @param name the name of semaphore
* @param value the init value of semaphore
* @param flag the flag of semaphore
*
* @return the operation status, RT_EOK on successful
*/
rt_err_t rt_sem_init(rt_sem_t sem,
const char *name,
rt_uint32_t value,
rt_uint8_t flag)
{
RT_ASSERT(sem != RT_NULL);

/* init object */
rt_object_init(&(sem->parent.parent), RT_Object_Class_Semaphore, name);//初始化信号量的内核对象

/* init ipc object */
rt_ipc_object_init(&(sem->parent));//初始化信号量的IPC对象

/* set init value */
sem->value = value;//设置信号量计数器的值

/* set parent */
sem->parent.parent.flag = flag;//设置信号量的内核对象的标志

return RT_EOK;
}

其中rt_object_init已在之前介绍rt-thread的内核对象中相关文章中已有介绍(参见: http://blog.csdn.net/flydream0/article/details/8568463),rt_ipc_object_init函数见如下:

/**
* @addtogroup IPC
*/

/*@{*/

/**
* This function will initialize an IPC object
*
* @param ipc the IPC object
*
* @return the operation status, RT_EOK on successful
*/
rt_inline rt_err_t rt_ipc_object_init(struct rt_ipc_object *ipc)
{
/* init ipc object */
rt_list_init(&(ipc->suspend_thread));//初始化线程挂起链表

return RT_EOK;
}
初始化及创建信号量很简单,一个是静态初始化,一个是动态分配的然后再初始化,不做过多解释.

2.2 创建信号量

/**
* This function will create a semaphore from system resource
*
* @param name the name of semaphore
* @param value the init value of semaphore
* @param flag the flag of semaphore
*
* @return the created semaphore, RT_NULL on error happen
*
* @see rt_sem_init
*/
rt_sem_t rt_sem_create(const char *name, rt_uint32_t value, rt_uint8_t flag)
{
rt_sem_t sem;

RT_DEBUG_NOT_IN_INTERRUPT;//确保此函数不是在中断中使用

/* allocate object */
sem = (rt_sem_t)rt_object_allocate(RT_Object_Class_Semaphore, name);//动态分配内核对象
if (sem == RT_NULL)
return sem;

/* init ipc object */
rt_ipc_object_init(&(sem->parent));//初始化信号量的IPC对象

/* set init value */
sem->value = value;//初始化信号量的计数器值

/* set parent */
sem->parent.parent.flag = flag;//设置信号量的内核对象的标志

return sem;
}

3 脱离及删除信号量

3.1 脱离信号量

/**
* This function will detach a semaphore from resource management
*
* @param sem the semaphore object
*
* @return the operation status, RT_EOK on successful
*
* @see rt_sem_delete
*/
rt_err_t rt_sem_detach(rt_sem_t sem)
{
RT_ASSERT(sem != RT_NULL);

/* wakeup all suspend threads */
rt_ipc_list_resume_all(&(sem->parent.suspend_thread));//唤醒所有信号量内挂起的线程

/* detach semaphore object */
rt_object_detach(&(sem->parent.parent));//脱离信号量的内核对象

return RT_EOK;
}

脱离信号量时被将挂起链表中的所有线程都唤醒,其中rt_ipc_list_resume_all函数如下:

/**
* This function will resume all suspended threads in a list, including
* suspend list of IPC object and private list of mailbox etc.
*
* @param list of the threads to resume
*
* @return the operation status, RT_EOK on successful
*/
rt_inline rt_err_t rt_ipc_list_resume_all(rt_list_t *list)
{
struct rt_thread *thread;
register rt_ubase_t temp;

/* wakeup all suspend threads */
while (!rt_list_isempty(list))//遍历线程挂起链表
{
/* disable interrupt */
temp = rt_hw_interrupt_disable();//关中断

/* get next suspend thread */
thread = rt_list_entry(list->next, struct rt_thread, tlist);//获得线程
/* set error code to RT_ERROR */
thread->error = -RT_ERROR;//设置线程的错误码为-RT_ERROR

/*
* resume thread
* In rt_thread_resume function, it will remove current thread from
* suspend list
*/
rt_thread_resume(thread);//唤醒此线程

/* enable interrupt */
rt_hw_interrupt_enable(temp);//开中断
}

return RT_EOK;
}
需要注意地是,被脱离的信号量时唤醒的线程的error值将会被设置为-RT_ERROR,以此标志此线程是被异常唤醒,并不是正常获取到信号量而被唤醒,这在take函数中将会以线程的error值来进行判断.

3.2 删除线程

/**
* This function will delete a semaphore object and release the memory
*
* @param sem the semaphore object
*
* @return the error code
*
* @see rt_sem_detach
*/
rt_err_t rt_sem_delete(rt_sem_t sem)
{
RT_DEBUG_NOT_IN_INTERRUPT;//确保此函数不是在中断中使用

RT_ASSERT(sem != RT_NULL);

/* wakeup all suspend threads */
rt_ipc_list_resume_all(&(sem->parent.suspend_thread));//唤醒所有挂起的线程

/* delete semaphore object */
rt_object_delete(&(sem->parent.parent));//删除信号量内核对象

return RT_EOK;
}
删除信号量与脱离信号量类似,说明路过。

4 获取信号量

4.1 等待信号量

/**
* This function will take a semaphore, if the semaphore is unavailable, the
* thread shall wait for a specified time.
*
* @param sem the semaphore object
* @param time the waiting time
*
* @return the error code
*/
rt_err_t rt_sem_take(rt_sem_t sem, rt_int32_t time)
{
register rt_base_t temp;
struct rt_thread *thread;

RT_ASSERT(sem != RT_NULL);

RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(sem->parent.parent)));

/* disable interrupt */
temp = rt_hw_interrupt_disable();//关中断

RT_DEBUG_LOG(RT_DEBUG_IPC, ("thread %s take sem:%s, which value is: %d\n",
rt_thread_self()->name,
((struct rt_object *)sem)->name,
sem->value));

if (sem->value > 0)//如果此信号量的计数器的值大于0,说明有信号,则应该立即返回
{
/* semaphore is available */
sem->value --;//则将信号量的计数器的值减1

/* enable interrupt */
rt_hw_interrupt_enable(temp);//开中断
}
else//如果此信号量的计数器的值小于或等于0,说明此时还未有信号
{
/* no waiting, return with timeout */
if (time == 0)//如果等待时间参数为0,则立即返回超时错误
{
rt_hw_interrupt_enable(temp);//开中断

return -RT_ETIMEOUT;//返回超时错误
}
else//等待信号
{
/* current context checking */
RT_DEBUG_NOT_IN_INTERRUPT;//确保此时不在中断中使用

/* semaphore is unavailable, push to suspend list */
/* get current thread */
thread = rt_thread_self();//获取当前正在运行的线程

/* reset thread error number */
thread->error = RT_EOK;//设置当前线程的错误代码为RT_EOK,需要注意这里

RT_DEBUG_LOG(RT_DEBUG_IPC, ("sem take: suspend thread - %s\n",
thread->name));

/* suspend thread */
rt_ipc_list_suspend(&(sem->parent.suspend_thread),//挂起当前线程到信号量中的断起线程链表
thread,
sem->parent.parent.flag);

/* has waiting time, start thread timer */
if (time > 0)//如果时间参数大于0
{
RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer list\n",
thread->name));

/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),//设置定时器
RT_TIMER_CTRL_SET_TIME,
&time);
rt_timer_start(&(thread->thread_timer));//启动定时器开始计时
}

/* enable interrupt */
rt_hw_interrupt_enable(temp);//开中断

/* do schedule */
rt_schedule();//当前线程已挂起,需要重新调试线程
//rt_schedule之后再执行到这里,只有两种可能,一是当前线程被挂起后时间已到达,此时,定时器的超时回调处理函数会将此线程的err值设为-RT_ETIMEOU,见thread.c源文件中的rt_thread_timeout函数;另一种情况是,有信号量到来,当前线程被rt_sem_release函数唤醒,此时,此线程的err值将一直保持原样不变,因此可以下面可能通过判断线程的err值来判断当前线程是否已被接收到信号量
if (thread->error != RT_EOK)//如果当前线程的错误代码不为RT_EOK,则返回,否则一直阻塞到等待到有信号到达或超时
{
return thread->error;
}
}
}

RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(sem->parent.parent)));

return RT_EOK;
}

获取信号量函数首先会判断当前是否有信号量(通过value值来判断),如果有则立即成功返回,如果没有,则接下来首先判断是否有时间参数,如果等待时间参数为0,则表示需要立即返回,则立即返回错误。如果等待时间参数大于0,则表示需要延时一段时间,在此延时期间,如何信号量到达,或者信号量被非法脱离,或一直没有等到,则通过判断线程的error值来判断当前是否已经成功获得信号值,因为如果成功获得信号量时,即在另一个线程release信号后,因此这一整个过程并没有修改线程的error值,因此,线程的error值一直保持原先的RT_EOK不变。若是线程一直没有等待到信号量的到达,即产生的定时器超时时(在take过程中会将设置一定时器,然后启动它,再挂起当前线程),在线程定时器的回调超时处理函数中,程序会将线程的error值修改为-RT_ETIMEOUT。另,如果之前讲解脱离线程时,如果在某一线程等待信号量期间,这个信号量被意外脱离了时,在脱离信号量的函数中(见3.1节),程序会将线程的error值修改为-RT_ERROR。

    综上所述,程序可以通过线程的error值来对其是否真正获得信号量进行判定,即如果线程的error值一直保持原样即thread->error==RT_EOK时,则为已获取信号量,否则没有获取,要么超时(-RT_ETIMEOUT),要么非法脱离(-RT_ERROR)了。


另外,当当前线程没有等到信号量时,程序会调用rt_ipc_list_suspend让当前线程挂起,这个挂起是指将当前线程加入到信号量的挂起链表中,这里有一个flag参数,即sem->parent.parent.flag(在信号量初始化时设置,见2.1节),其值有两种RT_IPC_FLAG_FIFO,RT_IPC_FLAG_FIFO,前者表示按FIFO的方式放入挂起链表,后者是根据线程本身的优先级等级来决定放入到挂起链表的位置,由于每次释放一个信号量,只会从信号量挂起链表上唤醒第一个线程(见第5章),因此,挂起线程链表上的位置就决定了当信号到达时挂起的线程的唤醒顺序。


rt_ipc_list_suspend的源码如下:

/**
* This function will suspend a thread to a specified list. IPC object or some
* double-queue object (mailbox etc.) contains this kind of list.
*
* @param list the IPC suspended thread list
* @param thread the thread object to be suspended
* @param flag the IPC object flag,
* which shall be RT_IPC_FLAG_FIFO/RT_IPC_FLAG_PRIO.
*
* @return the operation status, RT_EOK on successful
*/
rt_inline rt_err_t rt_ipc_list_suspend(rt_list_t *list,
struct rt_thread *thread,
rt_uint8_t flag)
{
/* suspend thread */
rt_thread_suspend(thread);//挂起线程

switch (flag)
{
case RT_IPC_FLAG_FIFO://FIFO方式
rt_list_insert_before(list, &(thread->tlist));//直接放入队列末尾
break;

case RT_IPC_FLAG_PRIO://按线程优先级方式
{
struct rt_list_node *n;
struct rt_thread *sthread;

/* find a suitable position */
for (n = list->next; n != list; n = n->next)//遍历信号量的挂起链表
{
sthread = rt_list_entry(n, struct rt_thread, tlist);

/* find out */
if (thread->current_priority < sthread->current_priority)//按优先级找到合适位置
{
/* insert this thread before the sthread */
rt_list_insert_before(&(sthread->tlist), &(thread->tlist));//将线程加入到链表中
break;
}
}

/*
* not found a suitable position,
* append to the end of suspend_thread list
*/
if (n == list)
rt_list_insert_before(list, &(thread->tlist));//没有找到合适位置,则放到末尾
}
break;
}

return RT_EOK;
}



4.2 获取无等待信号量

/**
* This function will try to take a semaphore and immediately return
*
* @param sem the semaphore object
*
* @return the error code
*/
rt_err_t rt_sem_trytake(rt_sem_t sem)
{
return rt_sem_take(sem, 0);
}

由此可见,rt_sem_trytake只是rt_sem_take函数的一种特例,时间参数为0而已.

5 释放信号量

/**
* This function will release a semaphore, if there are threads suspended on
* semaphore, it will be waked up.
*
* @param sem the semaphore object
*
* @return the error code
*/
rt_err_t rt_sem_release(rt_sem_t sem)
{
register rt_base_t temp;
register rt_bool_t need_schedule;

RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(sem->parent.parent)));

need_schedule = RT_FALSE;//默认情况下设置不需要重新调度标记

/* disable interrupt */
temp = rt_hw_interrupt_disable();//关中断

RT_DEBUG_LOG(RT_DEBUG_IPC, ("thread %s releases sem:%s, which value is: %d\n",
rt_thread_self()->name,
((struct rt_object *)sem)->name,
sem->value));

if (!rt_list_isempty(&sem->parent.suspend_thread))//挂起线程不为空
{
/* resume the suspended thread */
rt_ipc_list_resume(&(sem->parent.suspend_thread));//唤醒第一个挂起的线程
need_schedule = RT_TRUE;//需要重新调度
}
else
sem->value ++; /* increase value *///信号量计数器加1

/* enable interrupt */
rt_hw_interrupt_enable(temp);//开中断

/* resume a thread, re-schedule */
if (need_schedule == RT_TRUE)//如果需要重新调度线程,则重新调度
rt_schedule();

return RT_EOK;
}

释放信号量只对将信号时的value值加1。

其中函数rt_ipc_list_resume只会唤醒信号量中第一个挂起的线程,其源码如下:

/**
* This function will resume the first thread in the list of a IPC object:
* - remove the thread from suspend queue of IPC object
* - put the thread into system ready queue
*
* @param list the thread list
*
* @return the operation status, RT_EOK on successful
*/
rt_inline rt_err_t rt_ipc_list_resume(rt_list_t *list)
{
struct rt_thread *thread;

/* get thread entry */
thread = rt_list_entry(list->next, struct rt_thread, tlist);//获取线程

RT_DEBUG_LOG(RT_DEBUG_IPC, ("resume thread:%s\n", thread->name));

/* resume it */
rt_thread_resume(thread);//唤醒此线程

return RT_EOK;
}

这里需要注意地是,释放信号量的过程不会修改线程的error值,即error原持原值RT_EOK不变.

6 信号量控制

/**
* This function can get or set some extra attributions of a semaphore object.
*
* @param sem the semaphore object
* @param cmd the execution command
* @param arg the execution argument
*
* @return the error code
*/
rt_err_t rt_sem_control(rt_sem_t sem, rt_uint8_t cmd, void *arg)
{
rt_ubase_t level;
RT_ASSERT(sem != RT_NULL);

if (cmd == RT_IPC_CMD_RESET)//重置信号量的计数器值
{
rt_uint32_t value;

/* get value */
value = (rt_uint32_t)arg;
/* disable interrupt */
level = rt_hw_interrupt_disable();//关中断

/* resume all waiting thread */
rt_ipc_list_resume_all(&sem->parent.suspend_thread);//唤醒信号量上所有挂起的线程

/* set new value */
sem->value = (rt_uint16_t)value;//设置信号时的计数器值

/* enable interrupt */
rt_hw_interrupt_enable(level);//开中断

rt_schedule();//立即重新调试

return RT_EOK;
}

return -RT_ERROR;
}


只支持重置信号量,此时若其存在挂起线程,则将其全部唤醒再次重新调度。此时在rt_ipc_list_resume_all函数中会将所有挂起的线程的error值设置为-RT_ERROR.