进程间通讯的开发者指南

时间:2022-05-20 20:22:54


进程/应用之间的通信(例如将传感器应用的传感器数据发送到姿态滤波应用)是PX4软件架构的关键部分。进程(通常又被叫做节点(node))通过命名为总线(buses)进行消息交换的过程被称作订阅主题(topics)。

PX4中,一个订阅主题仅包含一个消息类型,比如vehicle_attitude订阅主题传输一个包含姿态结构(滚转、俯仰和偏航估计)的消息。  

节点可以在总线/订阅主题上发布(publish一个消息(发送数据)或者订阅(subscribe一个总线/订阅主题(接收数据) 。应用并不知道在与谁通信,1个订阅主题可以有多个发布器和订阅器。

这种设计模式阻止了锁定的问题(locking issues),在机器人领域非常常见。为了使这更为高效,通常在总线上只有一个消息,并且不会有队列。

这种发布/订阅机制由微对象请求处理器microobject request broker (uORB)实现。

快速开始

这是一个简单但是完整的发布器(publisher /订阅器(subscriber)组合,发布器发布一个叫做random_integer 的订阅主题,并使用随机的整数更新订阅主题。订阅器检查并打印这些更新。

topic.h

[cpp] view plain copy print?
  1. /* declare the topic */  
  2. ORB_DECLARE(random_integer);  
  3. /* define the data structure that will be published where subscribers cansee it */  
  4. struct random_integer_data{  
  5.           int r;  
  6. };  

publisher.c

[cpp] view plain copy print?
  1. #include <topic.h>  
  2.  /* create topic metadata */  
  3. ORB_DEFINE(random_integer);   
  4. /* file handle that will be used for publishing */  
  5. staticint topic_handle;   
  6. int init()  
  7. {  
  8.           /* generate the initial data for first publication*/  
  9.           struct random_integer_data rd={ .r= random(),};  
  10.           /* advertise the topic and make the initialpublication */  
  11.           topic_handle = orb_advertise(ORB_ID(random_integer),&rd);  
  12. }  
  13. int update_topic()  
  14. {  
  15.           /* generate a new random number forpublication */  
  16.           struct random_integer_data rd={ .r= random(),};  
  17.           /* publish the new data structure */  
  18.           orb_publish(ORB_ID(random_integer), topic_handle,&rd);  
  19. }  


subscriber.c

[cpp] view plain copy print?
  1. #include <topic.h>  
  2.  /* file handle that will be used forsubscribing */  
  3. staticint topic_handle;  
  4. int init()  
  5. {  
  6.           /* subscribe to the topic */  
  7.           topic_handle = orb_subscribe(ORB_ID(random_integer));  
  8. }  
  9.  voidcheck_topic()  
  10. {  
  11.           bool updated;  
  12.           struct random_integer_data rd;   
  13.           /* check to see whether the topic has updatedsince the last time we read it */  
  14.           orb_check(topic_handle,&updated);  
  15.           if(updated){  
  16.                     /* make a local copy of the updated datastructure */  
  17.                     orb_copy(ORB_ID(random_integer), topic_handle,&rd);  
  18.                     printf("Random integer is now %d\n", rd.r);  
  19.           }  
  20. }  

发布过程

发布过程包含三个独立却相关联的动作:定义订阅主题,通告订阅主题,并且发布更新。

定义订阅主题

作为一种在元件之间提供通用接口的方式,系统定义了许多标准的订阅主题。如果一个发布器希望使用某一标准的订阅主题和其相关的数据结构,不需要再做额外的工作。

常规订阅主题

为了定义一个常规的订阅主题,发布器需要为订阅器建一个可用的头文件(上面的topic.h)。在这个头文件里必须包含:

  • 将订阅主题的名字作为参数的一个ORB_DECLARE() 宏实例。
  • 一个用于描述将发布数据结构的结构定义。

订阅主题的名字应该是描述性的;PX4的命名规则是是使用下划线分开订阅主题的名字;组件的名字应该首先使用通用的词汇。

例如,原始传感器数据在sensors_raw订阅主题中进行发布。

除了头文件,发布器必须包含一个ORB_DEFINE()宏实例在源文件中,该文件可以在构建固件时被编译和链接(见publisher.c示例)。这个宏来构建由ORB使用的数据结构,从而唯一表示这个topic

可选的订阅主题(optionaltopic

如果一个由软件组件发布的订阅主题是可选的,并且可能不会出现在固件中,头文件可以使用 ORB_DECLARE_OPTIONAL()宏代替。以这种方式声明的订阅主题需要由发布器进行特别的处理,但是还有一些以下需要讨论的考虑,订阅器在处理可选订阅主题时必须要注意。

通告(advertise)订阅主题

在数据发布到一个订阅主题时,必须首先通告。发布器使用如下的API通告一个新的订阅主题:

[cpp] view plain copy print?
  1. /** 
  2.  * Advertise as the publisher of atopic. 
  3.  * 
  4.  * This performs the initialadvertisement of a topic; it creates the topic 
  5.  * node in /obj if required andwrites the initial data. 
  6.  * 
  7.  * @param meta                 The uORB metadata (usually fromthe ORB_ID() macro) 
  8.  *                             forthe topic. topic uORB元数据(通常来自ORB_ID()宏) 
  9.  * @param data                 A pointer to the initial datato be published.指向将要publish初始数据的指针。 
  10.  *                             Fortopics published by interrupt handlers, the advertisement 
  11.  *                             mustbe performed from non-interrupt context. 
  12.  * @return           ERROR on error, otherwise returns a handle 
  13.  *                             thatcan be used to publish to the topic. 
  14.  *                             Ifthe topic in question is not known (due to an 
  15.  *                             ORB_DEFINE_OPTIONALwith no corresponding ORB_DECLARE) 
  16.  *                             thisfunction will return -1 and set errno to ENOENT. 
  17.  */  
  18. externint          orb_advertise(conststruct orb_metadata *meta,constvoid*data);  

通告同时为订阅主题发布初始数据。

APImeta参数是由ORB_DEFINE()宏产生的数据指针,通常使用ORB_ID() 宏提供,该宏执行从订阅主题名到元数据结构名的转换。

需要注意的是,可以从中断句柄中发布实时更新,但是通告订阅主题必须在常规线程的情况下。

多个发布器

一个发布器一次只能通告一个订阅主题,然而发布器可能会关闭订阅主题句柄(这是个文件句柄,可以简单的传递到close()函数),然后由另外一个发布器通告订阅主题。

发布更新

一个订阅主题被通告后,从通告那里返回的句柄可以使用如下的API,用于对订阅主题发布更新:

[cpp] view plain copy print?
  1. /** 
  2.  * Publish new data to a topic. 
  3.  * 
  4.  * The data is atomically publishedto the topic and any waiting subscribers 
  5.  * will be notified.  Subscribers that are not waiting can checkthe topic 
  6.  * for updates using orb_checkand/or orb_stat. 
  7.  * 
  8.  * @handle           The handle returned from orb_advertise. 
  9.  * @param meta                 The uORB metadata (usually fromthe ORB() macro) 
  10.  *                             forthe topic. 
  11.  * @param data                 A pointer to the data to bepublished. 
  12.  * @return           OK on success, ERROR otherwise with errno set accordingly. 
  13.  */  
  14. externint          orb_publish(conststruct orb_metadata *meta,int handle,constvoid*data);  

需要注意的是ORB并不缓存多个更新,当订阅器检查订阅主题时,只会看到最近的一个更新。

订阅

订阅一个订阅主题需要如下:

  •  一个ORB_DEFINE()或者ORB_DEFINE_OPTIONAL()宏(例如,在订阅器中包含的头文件)
  • 在订阅主题中发布的数据结构定义。(通常来自相同的头文件)

假如这些条件都满足,订阅器使用下面的API来订阅一个订阅主题。

[cpp] view plain copy print?
  1. /** 
  2.  * Subscribe to a topic. 
  3.  * 
  4.  * The returned value is a filedescriptor that can be passed to poll() 
  5.  * in order to wait for updates to atopic, as well as orb_read, 
  6.  * orb_check and orb_stat. 
  7.  * 
  8.  * Subscription will succeed even ifthe topic has not been advertised; 
  9.  * in this case the topic will havea timestamp of zero, it will never 
  10.  * signal a poll() event, checkingwill always return false and it cannot 
  11.  * be copied. When the topic issubsequently advertised, poll, check, 
  12.  * stat and copy calls will react tothe initial publication that is 
  13.  * performed as part of theadvertisement. 
  14.  * 
  15.  * Subscription will fail if thetopic is not known to the system, i.e. 
  16.  * there is nothing in the systemthat has defined the topic and thus it 
  17.  * can never be published. 
  18.  * 
  19.  * @param meta                 The uORB metadata (usually fromthe ORB_ID() macro) 
  20.  *                             forthe topic. 
  21.  * @return           ERROR on error, otherwise returns a handle 
  22.  *                             thatcan be used to read and check the topic for updates. 
  23.  *                             Ifthe topic in question is not known (due to an 
  24.  *                             ORB_DEFINE_OPTIONALwith no corresponding ORB_DECLARE) 
  25.  *                             thisfunction will return -1 and set errno to ENOENT. 
  26.  */  
  27. externint          orb_subscribe(conststruct orb_metadata *meta);  
订阅一个可选的订阅主题时,如果订阅主题不存在的话,将会失败。然而其他的所有订阅都会成功,并创建订阅主题,即使这并没有经发布器通告。这极大的降低了仔细安排系统启动顺序的需要。

对一项任务可以执行的订阅数目并没有具体的限制。

取消对一个订阅主题的订阅,使用如下的API

[cpp] view plain copy print?
  1. /** 
  2.  * Unsubscribe from a topic. 
  3.  * 
  4.  * @param handle     A handle returned from orb_subscribe. 
  5.  * @return           OK on success, ERROR otherwise with errno set accordingly. 
  6.  */  
  7. externint          orb_unsubscribe(int handle);  

从订阅主题中拷贝数据

订阅器并不是引用存储在ORB中的数据,或者与其他订阅器共享数据;而是在其请求下,将数据从ORB中拷贝出来存放在每一个订阅器对应的临时缓冲区中。这一拷贝就避免了锁定问题,并使得发布器和订阅器的API都变的简单。也允许订阅器及时根据自己的使用需求对数据进行修改。

当订阅器希望得到发布到订阅主题的最近数据拷贝时,使用如下的API

[cpp] view plain copy print?
  1. /** 
  2.  * Fetch data from a topic. 
  3.  * 
  4.  * @param meta                 The uORB metadata (usually fromthe ORB() macro) 
  5.  *                             forthe topic. 
  6.  * @param handle     A handle returned from orb_subscribe. 
  7.  * @param buffer     Pointer to the buffer receiving the data. 
  8.  * @return           OK on success, ERROR otherwise with errno set accordingly. 
  9.  */  
  10. externint          orb_copy(conststruct orb_metadata *meta,int handle,void*buffer);  

复制可以保证数据是最新发布的。

更新检查

在订阅器调用orb_copy函数后,可以使用如下的API检查从上一个时间开始,订阅主题是否接收到了新的发布。

[cpp] view plain copy print?
  1. /** 
  2.  * Check whether a topic has beenpublished to since the last orb_copy. 
  3.  * 
  4.  * This check can be used todetermine whether to copy from the topic when 
  5.  * not using poll(), or to avoid theoverhead of calling poll() when the 
  6.  * topic is likely to have updated. 
  7.  * 
  8.  * Updates are tracked on aper-handle basis; this call will continue to 
  9.  * return true until orb_copy iscalled using the same handle. This interface 
  10.  * should be preferred over callingorb_stat due to the race window between 
  11.  * stat and copy that can lead tomissed updates. 
  12.  * 
  13.  * @param handle     A handle returned from orb_subscribe. 
  14.  * @param updated    Set to true if the topic has been publishedsince the 
  15.  *                             lasttime it was copied using this handle. 
  16.  * @return           OK if the check was successful, ERROR otherwise with 
  17.  *                             errnoset accordingly. 
  18.  */  
  19. externint          orb_check(int handle, bool *updated);  

当订阅主题在通告之前,已经被订阅,这个API将会返回没有更新,直到订阅主题被通告。

发布时间戳

订阅器可以使用如下的API检查订阅主题最近一次发布的时间:

[cpp] view plain copy print?
  1. /** 
  2.  * Return the last time that thetopic was published. 
  3.  * 
  4.  * @param handle     A handle returned from orb_subscribe. 
  5.  * @param time                 Returns the time that the topicwas published, or zero if it has 
  6.  *                             neverbeen published/advertised. 
  7.  * @return           OK on success, ERROR otherwise with errno set accordingly. 
  8.  */  
  9. externint          orb_stat(int handle,uint64_t*time);  

当调用这个功能时,需要仔细检查,因为并不能保证在调用返回后订阅主题不会紧接着就发布。

等待更新

依赖于发布作为数据源的订阅器,其可以同时具有任意数量等待发布的订阅。这一功能通过使用 poll() 函数实现,与文件描述符等待数据具有相同的方式。这之所以可以工作,是因为订阅本身实际上也是文件描述符。

下面的例子展示的是,订阅器具有三个独立的订阅,等待到每一个的发布,当收到这些发布时进行回应。如果第二轮没有到任意订阅的更新,更新超时计数器,并发布给其他订阅器。

color_counter.h

[cpp] view plain copy print?
  1. ORB_DECLARE(color_red);  
  2. ORB_DECLARE(color_green);  
  3. ORB_DECLARE(color_blue);  
  4. ORB_DECLARE(color_timeouts);  
  5.    
  6. /* structure published to color_red, color_green, color_blue andcolor_timeouts */  
  7. struct color_update  
  8. {  
  9.           int number;  
  10. };  
  11. color_counter.c  
  12. #include <poll.h>  
  13.    
  14. ORB_DEFINE(color_timeouts,struct color_update);  
  15.    
  16. void  
  17. subscriber(void)  
  18. {  
  19.           int       sub_red, sub_green, sub_blue;  
  20.           int       pub_timeouts;  
  21.           int       timeouts=0;  
  22.           struct color_update cu;  
  23.    
  24.           /* subscribe to color topics */  
  25.           sub_red = orb_subscribe(ORB_ID(color_red));  
  26.           sub_green = orb_subscribe(ORB_ID(color_green));  
  27.           sub_blue = orb_subscribe(ORB_ID(color_blue));  
  28.    
  29.           /* advertise the timeout topic */  
  30.           cu.number=0;  
  31.           pub_timeouts = orb_advertise(ORB_ID(color_timeouts),&cu);  
  32.    
  33.           /* loop waiting for updates */  
  34.           for(;;){  
  35.    
  36.                     /* wait for updates or a 1-second timeout */  
  37.                     struct pollfd fds[3]={  
  38.                                { .fd= sub_red,   .events= POLLIN },  
  39.                                { .fd= sub_green, .events= POLLIN },  
  40.                                { .fd= sub_blue,  .events= POLLIN }  
  41.                     };  
  42.                     int ret= poll(fds,3,1000);  
  43.    
  44.                     /* check for a timeout */  
  45.                     if(ret==0){  
  46.                                puts("timeout");  
  47.                                cu.number=++timeouts;  
  48.                                orb_publish(ORB_ID(color_timeouts), pub_timeouts,&cu);  
  49.    
  50.                     /* check for color updates */  
  51.                     }else{  
  52.    
  53.                                if(fds[0].revents& POLLIN){  
  54.                                          orb_copy(ORB_ID(color_red), sub_red,&cu);  
  55.                                         printf("red is now %d\n", cu.number);  
  56.                                }  
  57.                                if(fds[1].revents& POLLIN){  
  58.                                          orb_copy(ORB_ID(color_green), sub_green,&cu);  
  59.                                         printf("green is now %d\n", cu.number);  
  60.                                }  
  61.                                if(fds[2].revents& POLLIN){  
  62.                                          orb_copy(ORB_ID(color_blue), sub_blue,&cu);  
  63.                                         printf("blue is now %d\n", cu.number);  
  64.                                }  
  65.                     }  
  66.           }  
  67. }  

限制更新的速率

订阅器可能希望限制在订阅主题上接收更新的速率,这可以由下面的API完成。

[cpp] view plain copy print?
  1. /** 
  2.  * Set the minimum interval betweenwhich updates are seen for a subscription. 
  3.  * 
  4.  * If this interval is set, thesubscriber will not see more than one update 
  5.  * within the period. 
  6.  * 
  7.  * Specifically, the first time anupdate is reported to the subscriber a timer 
  8.  * is started. The update willcontinue to be reported via poll and orb_check, but 
  9.  * once fetched via orb_copy anotherupdate will not be reported until the timer 
  10.  * expires. 
  11.  * 
  12.  * This feature can be used to pacea subscriber that is watching a topic that 
  13.  * would otherwise update tooquickly. 
  14.  * 
  15.  * @param handle     A handle returned from orb_subscribe. 
  16.  * @param interval   An interval period in milliseconds. 
  17.  * @return           OK on success, ERROR otherwise with ERRNO set accordingly. 
  18.  */  
  19. externint          orb_set_interval(int handle,unsigned interval);  

速率限制是具体到某一个订阅的,一个订阅主题可能具有多于一个的订阅器,分别具有不同的速率限制。



版权声明:本文为博主[翻译]文章,未经博主允许可以转载,注明博客出处:[http://blog.csdn.net/lkk05]