进程/应用之间的通信(例如将传感器应用的传感器数据发送到姿态滤波应用)是PX4软件架构的关键部分。进程(通常又被叫做节点(node))通过命名为总线(buses)进行消息交换的过程被称作订阅主题(topics)。
在PX4中,一个订阅主题仅包含一个消息类型,比如vehicle_attitude订阅主题传输一个包含姿态结构(滚转、俯仰和偏航估计)的消息。
节点可以在总线/订阅主题上发布(publish)一个消息(发送数据)或者订阅(subscribe)一个总线/订阅主题(接收数据) 。应用并不知道在与谁通信,1个订阅主题可以有多个发布器和订阅器。
这种设计模式阻止了锁定的问题(locking issues),在机器人领域非常常见。为了使这更为高效,通常在总线上只有一个消息,并且不会有队列。
这种发布/订阅机制由微对象请求处理器microobject request broker (uORB)实现。
快速开始
这是一个简单但是完整的发布器(publisher) /订阅器(subscriber)组合,发布器发布一个叫做random_integer 的订阅主题,并使用随机的整数更新订阅主题。订阅器检查并打印这些更新。
topic.h
- /* declare the topic */
- ORB_DECLARE(random_integer);
- /* define the data structure that will be published where subscribers cansee it */
- struct random_integer_data{
- int r;
- };
publisher.c
- #include <topic.h>
- /* create topic metadata */
- ORB_DEFINE(random_integer);
- /* file handle that will be used for publishing */
- staticint topic_handle;
- int init()
- {
- /* generate the initial data for first publication*/
- struct random_integer_data rd={ .r= random(),};
- /* advertise the topic and make the initialpublication */
- topic_handle = orb_advertise(ORB_ID(random_integer),&rd);
- }
- int update_topic()
- {
- /* generate a new random number forpublication */
- struct random_integer_data rd={ .r= random(),};
- /* publish the new data structure */
- orb_publish(ORB_ID(random_integer), topic_handle,&rd);
- }
subscriber.c
- #include <topic.h>
- /* file handle that will be used forsubscribing */
- staticint topic_handle;
- int init()
- {
- /* subscribe to the topic */
- topic_handle = orb_subscribe(ORB_ID(random_integer));
- }
- voidcheck_topic()
- {
- bool updated;
- struct random_integer_data rd;
- /* check to see whether the topic has updatedsince the last time we read it */
- orb_check(topic_handle,&updated);
- if(updated){
- /* make a local copy of the updated datastructure */
- orb_copy(ORB_ID(random_integer), topic_handle,&rd);
- printf("Random integer is now %d\n", rd.r);
- }
- }
发布过程
发布过程包含三个独立却相关联的动作:定义订阅主题,通告订阅主题,并且发布更新。
定义订阅主题
作为一种在元件之间提供通用接口的方式,系统定义了许多标准的订阅主题。如果一个发布器希望使用某一标准的订阅主题和其相关的数据结构,不需要再做额外的工作。
常规订阅主题
为了定义一个常规的订阅主题,发布器需要为订阅器建一个可用的头文件(上面的topic.h)。在这个头文件里必须包含:
- 将订阅主题的名字作为参数的一个ORB_DECLARE() 宏实例。
- 一个用于描述将发布数据结构的结构定义。
订阅主题的名字应该是描述性的;PX4的命名规则是是使用下划线分开订阅主题的名字;组件的名字应该首先使用通用的词汇。
例如,原始传感器数据在sensors_raw订阅主题中进行发布。
除了头文件,发布器必须包含一个ORB_DEFINE()宏实例在源文件中,该文件可以在构建固件时被编译和链接(见publisher.c示例)。这个宏来构建由ORB使用的数据结构,从而唯一表示这个topic。
可选的订阅主题(optionaltopic)
如果一个由软件组件发布的订阅主题是可选的,并且可能不会出现在固件中,头文件可以使用 ORB_DECLARE_OPTIONAL()宏代替。以这种方式声明的订阅主题需要由发布器进行特别的处理,但是还有一些以下需要讨论的考虑,订阅器在处理可选订阅主题时必须要注意。
通告(advertise)订阅主题
在数据发布到一个订阅主题时,必须首先通告。发布器使用如下的API通告一个新的订阅主题:
- /**
- * Advertise as the publisher of atopic.
- *
- * This performs the initialadvertisement of a topic; it creates the topic
- * node in /obj if required andwrites the initial data.
- *
- * @param meta The uORB metadata (usually fromthe ORB_ID() macro)
- * forthe topic. topic uORB元数据(通常来自ORB_ID()宏)
- * @param data A pointer to the initial datato be published.指向将要publish初始数据的指针。
- * Fortopics published by interrupt handlers, the advertisement
- * mustbe performed from non-interrupt context.
- * @return ERROR on error, otherwise returns a handle
- * thatcan be used to publish to the topic.
- * Ifthe topic in question is not known (due to an
- * ORB_DEFINE_OPTIONALwith no corresponding ORB_DECLARE)
- * thisfunction will return -1 and set errno to ENOENT.
- */
- externint orb_advertise(conststruct orb_metadata *meta,constvoid*data);
通告同时为订阅主题发布初始数据。
API的meta参数是由ORB_DEFINE()宏产生的数据指针,通常使用ORB_ID() 宏提供,该宏执行从订阅主题名到元数据结构名的转换。
需要注意的是,可以从中断句柄中发布实时更新,但是通告订阅主题必须在常规线程的情况下。
多个发布器
一个发布器一次只能通告一个订阅主题,然而发布器可能会关闭订阅主题句柄(这是个文件句柄,可以简单的传递到close()函数),然后由另外一个发布器通告订阅主题。
发布更新
一个订阅主题被通告后,从通告那里返回的句柄可以使用如下的API,用于对订阅主题发布更新:
- /**
- * Publish new data to a topic.
- *
- * The data is atomically publishedto the topic and any waiting subscribers
- * will be notified. Subscribers that are not waiting can checkthe topic
- * for updates using orb_checkand/or orb_stat.
- *
- * @handle The handle returned from orb_advertise.
- * @param meta The uORB metadata (usually fromthe ORB() macro)
- * forthe topic.
- * @param data A pointer to the data to bepublished.
- * @return OK on success, ERROR otherwise with errno set accordingly.
- */
- externint orb_publish(conststruct orb_metadata *meta,int handle,constvoid*data);
需要注意的是ORB并不缓存多个更新,当订阅器检查订阅主题时,只会看到最近的一个更新。
订阅
订阅一个订阅主题需要如下:
- 一个ORB_DEFINE()或者ORB_DEFINE_OPTIONAL()宏(例如,在订阅器中包含的头文件)
- 在订阅主题中发布的数据结构定义。(通常来自相同的头文件)
假如这些条件都满足,订阅器使用下面的API来订阅一个订阅主题。
- /**
- * Subscribe to a topic.
- *
- * The returned value is a filedescriptor that can be passed to poll()
- * in order to wait for updates to atopic, as well as orb_read,
- * orb_check and orb_stat.
- *
- * Subscription will succeed even ifthe topic has not been advertised;
- * in this case the topic will havea timestamp of zero, it will never
- * signal a poll() event, checkingwill always return false and it cannot
- * be copied. When the topic issubsequently advertised, poll, check,
- * stat and copy calls will react tothe initial publication that is
- * performed as part of theadvertisement.
- *
- * Subscription will fail if thetopic is not known to the system, i.e.
- * there is nothing in the systemthat has defined the topic and thus it
- * can never be published.
- *
- * @param meta The uORB metadata (usually fromthe ORB_ID() macro)
- * forthe topic.
- * @return ERROR on error, otherwise returns a handle
- * thatcan be used to read and check the topic for updates.
- * Ifthe topic in question is not known (due to an
- * ORB_DEFINE_OPTIONALwith no corresponding ORB_DECLARE)
- * thisfunction will return -1 and set errno to ENOENT.
- */
- externint orb_subscribe(conststruct orb_metadata *meta);
对一项任务可以执行的订阅数目并没有具体的限制。
取消对一个订阅主题的订阅,使用如下的API:
- /**
- * Unsubscribe from a topic.
- *
- * @param handle A handle returned from orb_subscribe.
- * @return OK on success, ERROR otherwise with errno set accordingly.
- */
- externint orb_unsubscribe(int handle);
从订阅主题中拷贝数据
订阅器并不是引用存储在ORB中的数据,或者与其他订阅器共享数据;而是在其请求下,将数据从ORB中拷贝出来存放在每一个订阅器对应的临时缓冲区中。这一拷贝就避免了锁定问题,并使得发布器和订阅器的API都变的简单。也允许订阅器及时根据自己的使用需求对数据进行修改。
当订阅器希望得到发布到订阅主题的最近数据拷贝时,使用如下的API:
- /**
- * Fetch data from a topic.
- *
- * @param meta The uORB metadata (usually fromthe ORB() macro)
- * forthe topic.
- * @param handle A handle returned from orb_subscribe.
- * @param buffer Pointer to the buffer receiving the data.
- * @return OK on success, ERROR otherwise with errno set accordingly.
- */
- externint orb_copy(conststruct orb_metadata *meta,int handle,void*buffer);
复制可以保证数据是最新发布的。
更新检查
在订阅器调用orb_copy函数后,可以使用如下的API检查从上一个时间开始,订阅主题是否接收到了新的发布。
- /**
- * Check whether a topic has beenpublished to since the last orb_copy.
- *
- * This check can be used todetermine whether to copy from the topic when
- * not using poll(), or to avoid theoverhead of calling poll() when the
- * topic is likely to have updated.
- *
- * Updates are tracked on aper-handle basis; this call will continue to
- * return true until orb_copy iscalled using the same handle. This interface
- * should be preferred over callingorb_stat due to the race window between
- * stat and copy that can lead tomissed updates.
- *
- * @param handle A handle returned from orb_subscribe.
- * @param updated Set to true if the topic has been publishedsince the
- * lasttime it was copied using this handle.
- * @return OK if the check was successful, ERROR otherwise with
- * errnoset accordingly.
- */
- externint orb_check(int handle, bool *updated);
当订阅主题在通告之前,已经被订阅,这个API将会返回没有更新,直到订阅主题被通告。
发布时间戳
订阅器可以使用如下的API检查订阅主题最近一次发布的时间:
- /**
- * Return the last time that thetopic was published.
- *
- * @param handle A handle returned from orb_subscribe.
- * @param time Returns the time that the topicwas published, or zero if it has
- * neverbeen published/advertised.
- * @return OK on success, ERROR otherwise with errno set accordingly.
- */
- externint orb_stat(int handle,uint64_t*time);
当调用这个功能时,需要仔细检查,因为并不能保证在调用返回后订阅主题不会紧接着就发布。
等待更新
依赖于发布作为数据源的订阅器,其可以同时具有任意数量等待发布的订阅。这一功能通过使用 poll() 函数实现,与文件描述符等待数据具有相同的方式。这之所以可以工作,是因为订阅本身实际上也是文件描述符。
下面的例子展示的是,订阅器具有三个独立的订阅,等待到每一个的发布,当收到这些发布时进行回应。如果第二轮没有到任意订阅的更新,更新超时计数器,并发布给其他订阅器。
color_counter.h
- ORB_DECLARE(color_red);
- ORB_DECLARE(color_green);
- ORB_DECLARE(color_blue);
- ORB_DECLARE(color_timeouts);
- /* structure published to color_red, color_green, color_blue andcolor_timeouts */
- struct color_update
- {
- int number;
- };
- color_counter.c
- #include <poll.h>
- ORB_DEFINE(color_timeouts,struct color_update);
- void
- subscriber(void)
- {
- int sub_red, sub_green, sub_blue;
- int pub_timeouts;
- int timeouts=0;
- struct color_update cu;
- /* subscribe to color topics */
- sub_red = orb_subscribe(ORB_ID(color_red));
- sub_green = orb_subscribe(ORB_ID(color_green));
- sub_blue = orb_subscribe(ORB_ID(color_blue));
- /* advertise the timeout topic */
- cu.number=0;
- pub_timeouts = orb_advertise(ORB_ID(color_timeouts),&cu);
- /* loop waiting for updates */
- for(;;){
- /* wait for updates or a 1-second timeout */
- struct pollfd fds[3]={
- { .fd= sub_red, .events= POLLIN },
- { .fd= sub_green, .events= POLLIN },
- { .fd= sub_blue, .events= POLLIN }
- };
- int ret= poll(fds,3,1000);
- /* check for a timeout */
- if(ret==0){
- puts("timeout");
- cu.number=++timeouts;
- orb_publish(ORB_ID(color_timeouts), pub_timeouts,&cu);
- /* check for color updates */
- }else{
- if(fds[0].revents& POLLIN){
- orb_copy(ORB_ID(color_red), sub_red,&cu);
- printf("red is now %d\n", cu.number);
- }
- if(fds[1].revents& POLLIN){
- orb_copy(ORB_ID(color_green), sub_green,&cu);
- printf("green is now %d\n", cu.number);
- }
- if(fds[2].revents& POLLIN){
- orb_copy(ORB_ID(color_blue), sub_blue,&cu);
- printf("blue is now %d\n", cu.number);
- }
- }
- }
- }
限制更新的速率
订阅器可能希望限制在订阅主题上接收更新的速率,这可以由下面的API完成。
- /**
- * Set the minimum interval betweenwhich updates are seen for a subscription.
- *
- * If this interval is set, thesubscriber will not see more than one update
- * within the period.
- *
- * Specifically, the first time anupdate is reported to the subscriber a timer
- * is started. The update willcontinue to be reported via poll and orb_check, but
- * once fetched via orb_copy anotherupdate will not be reported until the timer
- * expires.
- *
- * This feature can be used to pacea subscriber that is watching a topic that
- * would otherwise update tooquickly.
- *
- * @param handle A handle returned from orb_subscribe.
- * @param interval An interval period in milliseconds.
- * @return OK on success, ERROR otherwise with ERRNO set accordingly.
- */
- externint orb_set_interval(int handle,unsigned interval);
速率限制是具体到某一个订阅的,一个订阅主题可能具有多于一个的订阅器,分别具有不同的速率限制。
版权声明:本文为博主[翻译]文章,未经博主允许可以转载,注明博客出处:[http://blog.csdn.net/lkk05]