ThreadX——IPC应用之消息队列

时间:2024-02-01 14:14:53
  • 作者:zzssdd2

  • E-mail:zzssdd2@foxmail.com

一、应用简介

消息队列是RTOS中常用的一种数据通信方式,常用于任务与任务之间或是中断与任务之间的数据传递。在裸机系统中我们通常会使用全局变量的方式进行数据传递,比如在事件发生后在中断中改变数据和设置标志,然后在主循环中轮询不同的标志是否生效来对全局数据执行不同的操作,执行完毕后清除相关标志。但是这种方式需要不断地轮询标志状态,使得CPU的利用率并不高。而使用RTOS的消息队列则具有任务阻塞机制,当没有需要处理的消息时任务挂起等待消息,此时其他任务占用CPU执行其他操作,当有消息放入队列时任务恢复运行进行消息接收和处理。这种消息处理机制相比裸机而言大大地提高了CPU利用率。

  • ThreadX的消息队列支持“消息置顶通知”功能,也就是可以将消息放在队列的最前面,使得任务可以及时处理某些紧急消息(RT-Thread的消息队列也有该功能)
  • ThreadX的消息队列可以传递任意长度的数据,因为它是采用传递数据指针的方式(uCOS也是采用这种引用传递的方式,而FreeRTOS和RT-Thread则支持传递整体数据内容。这两种方式各有优劣吧,指针传递方式优点是执行效率高,缺点是存数据的内存区域如果数据还未及时处理就被覆写了那么就会引发问题;整体数据传递方式优点是安全不需担心数据覆写致错,缺点是数据量大的话传递数据过程执行时间长导致效率低)

二、API简介

下面介绍使用ThreadX的消息队列时常用的几个API函数。

1、创建消息队列

  • 描述
    • 该服务用于创建消息队列。 消息总数是根据指定的消息大小和队列中的字节总数来计算的
    • 如果在队列的内存区域中指定的字节总数不能被指定的消息大小均分,则不会使用该内存区域中的其余字节
  • 参数
    • queue_ptr 指向消息队列控制块的指针
    • name_ptr 指向消息队列名称的指针
    • message_size 指定队列中每条消息的大小。 消息大小选项为1个32位字到16个32位字之间(包含)
    • queue_start 消息队列的起始地址。 起始地址必须与ULONG数据类型的大小对齐
    • queue_size 消息队列可用的字节总数
  • 返回值
    • TX_SUCCESS (0x00) 创建成功
    • TX_QUEUE_ERROR (0x09) 无效的消息队列指针,指针为NULL或队列已创建
    • TX_PTR_ERROR (0x03) 消息队列的起始地址无效
    • TX_SIZE_ERROR (0x05) 消息队列大小无效
    • TX_CALLER_ERROR (0x13) 该服务的调用者无效
UINT tx_queue_create(
    TX_QUEUE *queue_ptr, 
    CHAR *name_ptr,
    UINT message_size,
    VOID *queue_start, 
    ULONG queue_size);

2、删除消息队列

  • 描述
    • 此服务删除指定的消息队列。所有挂起等待此队列消息的线程都将恢复,并给出TX_DELETED返回状态
    • 在删除队列之前,应用程序必须确保已完成(或禁用)此队列的所有send_notify回调。 此外,应用程序必须防止将来使用已删除的队列
    • 应用程序还负责管理与队列相关联的内存区域,该内存区域在此服务完成后可用
  • 参数
    • queue_ptr 指向先前创建的消息队列的指针
  • 返回值
    • TX_SUCCESS (0x00) 删除成功
    • TX_QUEUE_ERROR (0x09) 消息队列指针无效
    • TX_CALLER_ERROR (0x13) 该服务的调用者无效
UINT tx_queue_delete(TX_QUEUE *queue_ptr);

3、清空消息队列

  • 描述
    • 此服务删除存储在指定消息队列中的所有消息
    • 如果队列已满,将丢弃所有挂起线程的消息,然后恢复每个挂起的线程,并返回一个指示消息发送成功的返回状态。如果队列为空,则此服务不执行任何操作。
  • 参数
    • queue_ptr 指向先前创建的消息队列的指针
  • 返回值
    • TX_SUCCESS (0x00) 操作成功
    • TX_QUEUE_ERROR (0x09) 消息队列指针无效
UINT tx_queue_flush(TX_QUEUE *queue_ptr);

4、消息置顶

  • 描述
    • 该服务将消息发送到指定消息队列的最前面。 消息从源指针指定的存储区域复制到队列的最前面
  • 参数
    • queue_ptr 指向消息队列控制块的指针
    • source_ptr 指向存放消息的指针
    • wait_option 定义消息队列已满时服务的行为
      • TX_NO_WAIT (0x00000000) - 无论是否成功都立即返回(用于非线程调用,例如中断里面)
      • TX_WAIT_FOREVER (0xFFFFFFFF) - 一直等待直到消息队列有空闲为止
  • 返回值
    • TX_SUCCESS (0x00) 操作成功
    • TX_DELETED (0x01) 线程挂起时,消息队列被删除
    • TX_QUEUE_FULL (0x0B) 服务无法发送消息,因为在指定的等待时间内队列已满
    • TX_WAIT_ABORTED (0x1A) 被另一个线程、计时器或ISR中断给中止
    • TX_QUEUE_ERROR (0x09) 无效的消息队列指针
    • TX_PTR_ERROR (0x03) 消息的源指针无效
    • TX_WAIT_ERROR (0x04) 在非线程调用中指定了TX_NO_WAIT以外的等待选项
UINT tx_queue_front_send(
    TX_QUEUE *queue_ptr,
    VOID *source_ptr, 
    ULONG wait_option);

5、获取消息队列信息

  • 描述
    • 该服务检索有关指定消息队列的信息
  • 参数(TX_NULL表示不需要获取该参数代表的信息)
    • queue_ptr 指向先前创建的消息队列的指针
    • name 指向目标的指针,用于指向队列名称
    • enqueued 指向目标的指针,表示当前队列中的消息数
    • available_storage 指向目标的指针,表示队列当前有空间容纳的消息数
    • first_suspended 指向目标的指针,该指针指向该队列的挂起列表中第一个线程
    • suspended_count 指向目标的指针,用于指示当前在此队列上挂起的线程数
    • next_queue 指向下一个创建队列的指针的目标的指针
  • 返回值
    • TX_SUCCESS (0x00) 操作成功
    • TX_QUEUE_ERROR (0x09) 无效的消息队列指针
UINT tx_queue_info_get(
    TX_QUEUE *queue_ptr, 
    CHAR **name,
    ULONG *enqueued, 
    ULONG *available_storage
    TX_THREAD **first_suspended, 
    ULONG *suspended_count,
    TX_QUEUE **next_queue);

6、从队列获取消息

  • 描述
    • 该服务从指定的消息队列中检索消息。 检索到的消息从队列复制到目标指针指定的存储区域。 然后将该消息从队列中删除
    • 指定的目标存储区必须足够大以容纳消息。 也就是说,由destination_ptr 指向的消息目标必须至少与此队列的消息大小一样大。 否则,如果目标不够大,则会在存储区域中发生内存地址非法错误
  • 参数
    • queue_ptr 指向先前创建的消息队列的指针
    • destination_ptr 指向储存消息的地址
    • wait_option 定义消息队列为空时服务的行为
      • TX_NO_WAIT (0x00000000) - 无论是否成功都立即返回(用于非线程调用,例如中断里面)
      • TX_WAIT_FOREVER (0xFFFFFFFF) - 一直等待直到有消息可以获取
      • 0x00000001 ~ 0xFFFFFFFE- 指定具体等待心跳节拍数(如果心跳频率1KHZ,那么单位就是ms )
  • 返回值
    • TX_SUCCESS (0x00) 操作成功
    • TX_DELETED (0x01) 线程挂起时删除了消息队列
    • TX_QUEUE_EMPTY (0x0A) 服务无法检索消息,因为队列在指定的等待时间段内为空
    • TX_WAIT_ABORTED (0x1A) 被另一个线程、计时器或ISR中断给中止
    • TX_QUEUE_ERROR (0x09) 无效的消息队列指针
    • TX_PTR_ERROR (0x03) 消息的目标指针无效
    • TX_WAIT_ERROR (0x04) 在非线程调用中指定了TX_NO_WAIT以外的等待选项
UINT tx_queue_receive(
    TX_QUEUE *queue_ptr,
    VOID *destination_ptr, 
    ULONG wait_option);

7、向队列发送消息

  • 描述
    • 此服务将消息发送到指定的消息队列。发送的消息将从源指针指定的内存区域复制到队列中。
  • 参数
    • queue_ptr 指向先前创建的消息队列的指针
    • source_ptr 指向消息的指针
    • wait_option 定义消息队列已满时服务的行为
      • TX_NO_WAIT (0x00000000) - 无论是否成功都立即返回(用于非线程调用,例如中断里面)
      • TX_WAIT_FOREVER (0xFFFFFFFF) - 一直等待直到队列有空位可以放置消息
      • 0x00000001 ~ 0xFFFFFFFE - 指定具体等待心跳节拍数(如果心跳频率1KHZ,那么单位就是ms )
  • 返回值
    • TX_SUCCESS (0x00) 操作成功
    • TX_DELETED (0x01) 线程挂起时删除了消息队列
    • TX_QUEUE_FULL (0x0B) 服务无法发送消息,因为队列在指定的等待时间内已满
    • TX_WAIT_ABORTED (0x1A) 被另一个线程、计时器或ISR中断给中止
    • TX_QUEUE_ERROR (0x09) 无效的消息队列指针
    • TX_PTR_ERROR (0x03) 消息的目标指针无效
    • TX_WAIT_ERROR (0x04) 在非线程调用中指定了TX_NO_WAIT以外的等待选项
UINT tx_queue_send(
    TX_QUEUE *queue_ptr,
    VOID *source_ptr, 
    ULONG wait_option);

8、注册发送通知回调函数

  • 描述
    • 此服务注册一个通知回调函数,每当一条消息发送到指定的队列时就会调用该函数。 通知回调的处理由应用程序定义
    • 不允许在应用程序的队列发送通知回调函数中调用具有暂停选项的ThreadX API
  • 参数
    • queue_ptr 指向先前创建的队列的指针
    • queue_send_notify 指向应用程序队列发送通知功能的指针。 如果此值为TX_NULL,则禁用通知
  • 返回值
    • TX_SUCCESS (0x00) 操作成功
    • TX_QUEUE_ERROR (0x09) 无效的队列指针
    • TX_FEATURE_NOT_ENABLED (0xFF) 禁用了通知功能
UINT tx_queue_send_notify(
    TX_QUEUE *queue_ptr,
    VOID (*queue_send_notify)(TX_QUEUE *));

三、实例演示

  • 该应用实例创建三个任务和一个队列消息发送通知回调
  • 任务1:按键1按一次向消息队列1发送一条消息(单个变量消息)
  • 任务2:按键2按一次向消息队列2发送一条消息(结构体指针消息)
  • 任务3:向消息队列3发送消息;接收任务1和任务2的消息并打印输出消息内容
  • 回调功能:输出消息队列3的相关信息

创建消息队列

#define DEMO_STACK_SIZE         (2 * 1024)
#define DEMO_BYTE_POOL_SIZE     (32 * 1024)

TX_THREAD       thread_0;
TX_THREAD		thread_1;
TX_THREAD		thread_2;

TX_BYTE_POOL	byte_pool_0;
UCHAR			memory_area[DEMO_BYTE_POOL_SIZE];

/* 消息队列 */
TX_QUEUE        tx_queue1;
TX_QUEUE        tx_queue2;
TX_QUEUE        tx_queue3;

ULONG           msg_queue1[32];
ULONG           msg_queue2[16];
ULONG           msg_queue3[8];

struct S_DATA{
    uint32_t id;
    uint16_t flag;
    uint8_t msg[2];
};
struct S_DATA data_package;

void thread_0_entry(ULONG thread_input);
void thread_1_entry(ULONG thread_input);
void thread_2_entry(ULONG thread_input);
void queue3_send_notify(TX_QUEUE *input);
void tx_application_define(void *first_unused_memory)
{
	CHAR    *pointer = TX_NULL;
	
	/* Create a byte memory pool from which to allocate the thread stacks. */
    tx_byte_pool_create(&byte_pool_0, "byte pool 0", memory_area, DEMO_BYTE_POOL_SIZE);
	
	/* Allocate the stack for thread 0. */
    tx_byte_allocate(&byte_pool_0, (VOID **) &pointer, DEMO_STACK_SIZE, TX_NO_WAIT);
    /* Create the main thread. */
    tx_thread_create(&thread_0, "thread 0", thread_0_entry, 0,  
            pointer, DEMO_STACK_SIZE, 
            1, 1, TX_NO_TIME_SLICE, TX_AUTO_START);
	
	/* Allocate the stack for thread 1. */
    tx_byte_allocate(&byte_pool_0, (VOID **) &pointer, DEMO_STACK_SIZE, TX_NO_WAIT);
    /* Create threads 1 */
    tx_thread_create(&thread_1, "thread 1", thread_1_entry, 0,  
            pointer, DEMO_STACK_SIZE, 
            2, 2, TX_NO_TIME_SLICE, TX_AUTO_START);
			
	/* Allocate the stack for thread 2. */
    tx_byte_allocate(&byte_pool_0, (VOID **) &pointer, DEMO_STACK_SIZE, TX_NO_WAIT);
    /* Create threads 1 */
    tx_thread_create(&thread_2, "thread 2", thread_2_entry, 0,  
            pointer, DEMO_STACK_SIZE, 
            3, 3, TX_NO_TIME_SLICE, TX_AUTO_START);
            
    /* 创建消息队列 */
    tx_queue_create(&tx_queue1, "tx_queue1", 1, msg_queue1, sizeof(msg_queue1));
    tx_queue_create(&tx_queue2, "tx_queue2", 1, msg_queue2, sizeof(msg_queue2));
    tx_queue_create(&tx_queue3, "tx_queue2", 1, msg_queue3, sizeof(msg_queue3));
    /* 注册发送消息回调 */
    tx_queue_send_notify(&tx_queue3, queue3_send_notify);
}

任务1

void    thread_0_entry(ULONG thread_input)
{
    uint8_t i =0, key_flag = 0;
    uint8_t data_single = 0;
    
    while(1)
    {
        if (0 == key_flag)
        {
            if (GPIO_PIN_SET == HAL_GPIO_ReadPin(KEY1_GPIO_Port, KEY1_Pin))
            {
                key_flag = 1;
            }
        }
        else
        {
            if (GPIO_PIN_RESET == HAL_GPIO_ReadPin(KEY1_GPIO_Port,KEY1_Pin))
			{
				key_flag = 0;
                /*按键1触发,向队列1发送消息*/
                data_single++;
                tx_queue_send(&tx_queue1, &data_single, TX_NO_WAIT);
			}
        }
		tx_thread_sleep(20);
    }
}

任务2

void    thread_1_entry(ULONG thread_input)
{
    uint8_t key_flag = 0;
    struct S_DATA *pData;
    
    pData = &data_package;
    pData->id       = 1;
    pData->flag     = 2;
    pData->msg[0]   = 3;
    pData->msg[1]   = 4;
    
    while(1)
    {
		if (0 == key_flag)
        {
            if (GPIO_PIN_SET == HAL_GPIO_ReadPin(KEY2_GPIO_Port, KEY2_Pin))
            {
                key_flag = 1;
            }
        }
        else
        {
            if (GPIO_PIN_RESET == HAL_GPIO_ReadPin(KEY2_GPIO_Port,KEY2_Pin))
			{
				key_flag = 0;
                /*按键2触发,向队列2发送消息*/
                pData->id       += 8;
                pData->flag     += 4;
                pData->msg[0]   += 2;
                pData->msg[1]   += 1;
                tx_queue_send(&tx_queue2, &pData, TX_NO_WAIT);
			}
        }
		tx_thread_sleep(20);
    }
}

任务3

void    thread_2_entry(ULONG thread_input)
{
    UINT status;
    uint8_t char_data;
    ULONG long_data = 0;
    struct S_DATA *buf_data;
    
	while(1)
	{
        /* 向队列3发送消息 */
        long_data++;
        tx_queue_send(&tx_queue3, &long_data, TX_NO_WAIT);
        if (0 == (long_data & 7))
        {
            tx_queue_flush(&tx_queue3);
        }
        
        /* 接收队列1消息 */
		status = tx_queue_receive(&tx_queue1, &char_data, 1000);
        if (TX_SUCCESS == status)
        {
            SEGGER_RTT_SetTerminal(0);
            SEGGER_RTT_printf(0, RTT_CTRL_TEXT_BRIGHT_GREEN"message queue1 receive data is %d\r\n", char_data);
        }
        
        /* 接收队列2消息 */
        status = tx_queue_receive(&tx_queue2, &buf_data, 1000);
        if (TX_SUCCESS == status)
        {
            SEGGER_RTT_SetTerminal(1);
            SEGGER_RTT_printf(0, RTT_CTRL_TEXT_BRIGHT_YELLOW"message queue2 receive data is %d\t%d\t%d\t%d \r\n", \
                            buf_data->id, \
                            buf_data->flag, \
                            buf_data->msg[0], \
                            buf_data->msg[1]);
        }
	} 	
}

发送队列消息回调功能

void    queue3_send_notify(TX_QUEUE *input)
{
    ULONG enqueued;             // 队列中的消息数
    ULONG available_storage;    // 队列剩余空间
    
    tx_queue_info_get(&tx_queue3, TX_NULL, &enqueued, &available_storage, TX_NULL, TX_NULL, TX_NULL);
    
    SEGGER_RTT_SetTerminal(2);
    SEGGER_RTT_printf(0, "the number of messages in the queue3 %d\r\n", enqueued);
    SEGGER_RTT_printf(0, "the queue3 remaining size %d\r\n", available_storage);
}

任务1演示结果

任务2演示结果

任务3演示结果

注:关于使用SEGGER_RTT打印功能可以参考这篇笔记:https://www.cnblogs.com/zzssdd2/p/14162382.html