目录
. 引言
. 环形队列的实现原理
. 环形队列编程实现
. 环形队列的内核实现
1. 引言
环形队列是在实际编程极为有用的数据结构,它有如下特点
. 它是一个首尾相连的FIFO(First In First Out 队列)的数据结构
. 采用数组的线性空间,数据组织简单
. 能很快知道队列是否满、或是否空
. 能以很快的速度、几乎无锁的方式(只在写满或为空的情况下需要加锁实现互斥同步)来存取数据
因为环形队列简单高效的原因,甚至在硬件都实现了环形队列,环形队列广泛用于网络数据收发,和不同程序间数据交换(比如内核与应用程序大量交换数据,从硬件接收大量数据)均使用了环形队列
0x1: FIFO队列
First Input First Output的缩写,先入先出队列,这是一种传统的按序执行方法,先进入的指令先完成并引退,跟着才执行第二条指令
. FIFO队列不对报文进行分类,当报文进入接口的速度大于接口能发送的速度时,FIFO按报文到达接口的先后顺序让报文进入队列,同时,FIFO在队列的出口让报文按进队的顺序出队,先进的报文将先出队,后进的报文将后出队
. FIFO队列具有处理简单,开销小的优点。但FIFO不区分报文类型,采用尽力而为的转发模式,使对时间敏感的实时应用(如VoIP)的延迟得不到保证,关键业务的带宽也不能得到保证
. FIFO一般用于不同时钟域之间的数据传输,比如FIFO的一端是AD数据采集,另一端是计算机的PCI总线
) 假设其AD采集的速率为16位 100K SPS,那么每秒的数据量为100K×16bit=.6Mbps
) PCI总线的速度为33MHz,总线宽度32bit,其最大传输速率为1056Mbps
在两个不同的时钟域间就可以采用FIFO来作为数据缓冲。另外对于不同宽度的数据接口也可以用FIFO,例如单片机位8位数据输出,而DSP可能是16位数据输入,在单片机与DSP连接时就可以使用FIFO来达到数据匹配的目的
根据FIFO工作的时钟域,可以将FIFO分为两类
. 同步FIFO
同步FIFO是指读时钟和写时钟为同一个时钟。在时钟沿来临时同时发生读写操作,共同争用同一个FIFO队列 . 异步FIFO
异步FIFO是指读写时钟不一致,读写时钟是互相独立的,可以一定程度上提高FIFO的利用效率,同时也会带来同步的问题
FIFO设计的难点在于怎样判断FIFO的空/满状态。为了保证数据正确的写入或读出,而不发生溢出或读空的状态出现,必须保证FIFO在满的情况下,不能进行写操作。在空的状态下不能进行读操作。怎样判断FIFO的满/空就成了FIFO设计的核心问题,而循环队列是一个很好的算法思路
Relevant Link:
http://baike.baidu.com/view/736423.htm?fromtitle=FIFO&fromid=64838&type=syn
2. 环形队列的实现原理
内存上没有环形的结构,因此环形队列实上是数组的线性空间来实现。当数据到了尾部如何处理,它将转回到0位置来处理。这个的转回是通过取模操作来执行的
因此环列队列的是逻辑上将数组元素q[0]与q[MAXN-1]连接,形成一个存放队列的环形空间,为了方便读写,还要用数组下标来指明队列的读写位置
. head: head指向可以读的位置
. tail: tail指向可以写的位置
环形队列的关键是判断队列为空,还是为满
. 当tail追上head时,队列为写满
. 当head追上tail时,队列为读空
如何判断环形队列为空,为满有两种判断方法
. 附加一个标志位tag
) 当head赶上tail,队列空,则tag =
) 当tail赶上head,队列满,则tag = . 限制tail赶上head,即队尾结点与队首结点之间至少留有一个元素的空间
) 队列空: head == tail
) 队列满: (tail+) % MAXN == head
Relevant Link:
http://docs.linuxtone.org/ebooks/C&CPP/c/ch12s05.html
http://coolshell.cn/tag/disruptor
http://www.searchtb.com/2012/10/introduction_to_disruptor.html
http://www.cnblogs.com/haolujun/archive/2012/10/03/2710726.html
3. 环形队列编程实现
0x1: 附加标志环形队列Ring Buffer
ringq.h
#ifndef __RINGQ_H__
#define __RINGQ_H__ #ifdef __cplusplus
extern "C"
{
#endif #define QUEUE_MAX 20 /*
1. 初始化状态: q->head = q->tail = q->tag = 0;
2. 队列为空:(q->head == q->tail) && (q->tag == 0)
3. 队列为满: ((q->head == q->tail) && (q->tag == 1)) 1. 入队操作: 如队列不满,则写入: q->tail = (q->tail + 1) % q->size ;
2. 出队操作:如果队列不空,则从head处读出
3. 下一个可读的位置: q->head = (q->head + 1) % q->size
*/
typedef struct ringq
{
int head; /* 头部,出队列方向*/
int tail; /* 尾部,入队列方向*/
int tag ; /* 为空还是为满的标志位*/
int size ; /* 队列总尺寸 */
int space[QUEUE_MAX]; /* 队列空间 */
}RINGQ; /*
第一种设计方法:
当head == tail 时,tag = 0 为空,等于 = 1 为满
*/ extern int ringq_init(RINGQ * p_queue); extern int ringq_free(RINGQ * p_queue); /* 加入数据到队列 */
extern int ringq_push(RINGQ * p_queue,int data); /* 从队列取数据 */
extern int ringq_poll(RINGQ * p_queue,int *p_data); #define ringq_is_empty(q) ( (q->head == q->tail) && (q->tag == 0)) #define ringq_is_full(q) ( (q->head == q->tail) && (q->tag == 1)) #define print_ringq(q) printf("ring head %d,tail %d,tag %d\n", q->head,q->tail,q->tag);
#ifdef __cplusplus
}
#endif #endif /* __RINGQ_H__ */
ringq.c
#include <stdio.h>
#include "ringq.h" int ringq_init(RINGQ * p_queue)
{
p_queue->size = QUEUE_MAX ; p_queue->head = ;
p_queue->tail = ; p_queue->tag = ; return ;
} int ringq_free(RINGQ * p_queue)
{
return ;
} int ringq_push(RINGQ * p_queue,int data)
{
print_ringq(p_queue); if(ringq_is_full(p_queue))
{
printf("ringq is full\n");
return -;
} p_queue->space[p_queue->tail] = data; p_queue->tail = (p_queue->tail + ) % p_queue->size ; /* 这个时候一定队列满了*/
if(p_queue->tail == p_queue->head)
{
p_queue->tag = ;
} return p_queue->tag ;
} int ringq_poll(RINGQ * p_queue,int * p_data)
{
print_ringq(p_queue);
if(ringq_is_empty(p_queue))
{
printf("ringq is empty\n");
return -;
} *p_data = p_queue->space[p_queue->head]; p_queue->head = (p_queue->head + ) % p_queue->size ; /* 这个时候一定队列空了*/
if(p_queue->tail == p_queue->head)
{
p_queue->tag = ;
}
return p_queue->tag ;
} /* 测试第一种环形队列*/
void test5()
{
RINGQ rq, * p_queue;
int i,data; p_queue = &rq; ringq_init(p_queue); for(i=; i < QUEUE_MAX + ; i++)
{
ringq_push(p_queue,i+);
} if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); ringq_free(p_queue);
} /* 测试第一种环形队列,更加复杂的情况*/
void test6()
{
RINGQ rq, * p_queue;
int i,data; p_queue = &rq; ringq_init(p_queue);
ringq_push(p_queue,);
ringq_push(p_queue,); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); ringq_push(p_queue,);
ringq_push(p_queue,);
ringq_push(p_queue,); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); ringq_push(p_queue,); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); if(ringq_poll(p_queue,&data)>=)
PRINT_INT(data); ringq_free(p_queue);
} int mani()
{
test5();
test6(); return ;
}
0x2: 预留空间环形队列
ringq.h
#ifndef __RINGQ_H__
#define __RINGQ_H__ #ifdef __cplusplus
extern "C"
{
#endif #define RINGQ_MAX 20 /*
1. 初始化状态: q->head = q->tail = q->tag = 0;
2. 队列为空:(q->head == q->tail)
3. 队列为满: (((q->tail+1)%q->size) == q->head ) 1. 入队操作: 如队列不满,则写入: q->tail = (q->tail + 1) % q->size ;
2. 出队操作:如果队列不空,则从head处读出
3. 下一个可读的位置: q->head = (q->head + 1) % q->size
*/
typedef struct ringq
{
int head; /* 头部,出队列方向*/
int tail; /* 尾部,入队列方向*/
int size ; /* 队列总尺寸 */
int space[RINGQ_MAX]; /* 队列空间 */
}RINGQ; /*
取消tag .限制读与写之间至少要留一个空间
队列空 head == tail .
队列满是 (tail+1)%MAX == head
初始化是head = tail = 0;
*/ extern int ringq_init(RINGQ * p_ringq); extern int ringq_free(RINGQ * p_ringq); extern int ringq_push(RINGQ * p_ringq,int data); extern int ringq_poll(RINGQ * p_ringq,int * p_data); #define ringq_is_empty(q) (q->head == q->tail) #define ringq_is_full(q) (((q->tail+1)%q->size) == q->head ) #define print_ringq2(q,d) printf("ring head %d,tail %d,data %d\n", q->head,q->tail,d); #ifdef __cplusplus
}
#endif #endif /* __QUEUE_H__ */
ringq.c
#include <stdio.h>
#include "ringq.h" int ringq_init(RINGQ * p_ringq)
{
p_ringq->size = RINGQ_MAX; p_ringq->head = ;
p_ringq->tail = ; return p_ringq->size;
} int ringq_free(RINGQ * p_ringq)
{
return ;
} /* 往队列加入数据 */
int ringq_push(RINGQ * p_ringq,int data)
{
print_ringq(p_ringq,data); if(ringq_is_full(p_ringq))
{
printf("ringq is full,data %d\n",data);
return -;
} p_ringq->space[p_ringq->tail] = data;
p_ringq->tail = (p_ringq->tail + ) % p_ringq->size ; return p_ringq->tail ;
} int ringq_poll(RINGQ * p_ringq,int * p_data)
{
print_ringq(p_ringq,-);
if(ringq_is_empty(p_ringq))
{
printf("ringq is empty\n");
return -;
} *p_data = p_ringq->space[p_ringq->head];
p_ringq->head = (p_ringq->head + ) % p_ringq->size ; return p_ringq->head;
}
Relevant Link:
http://blog.csdn.net/sking002007/article/details/6584590
http://blog.chinaunix.net/uid-26669601-id-3737307.html
http://blog.sina.com.cn/s/blog_79c57c3d01019v8p.html
4. 环形队列的内核实现
在内核态实现无锁的读写环形队列
. 内核态Hook进程/线程会不断的通过自旋互斥锁,试图获取下一个可以写数据的4K内存页,请求会在下面两个情况下失败
) 环形队列写满: READ_FLAG == READING(),即头追上了尾巴,这种情况下,当前进程需要保持自旋,并在超时后丢包
) 其他进程正在请求当前内存页: Spin_lock已经被持有,这种情况下,当前进程需要等待上一个进程请求完成,并将current_page+1并释放Spin_lock后,方可继续请求下一个内存页
. 用户态进程负责从环形队列中读取数据,它有两种状态
) READING: 表示用户态程序接收到了内核态的SIGNAL信号,并处于数据读取状态,在这个状态下,内核态其他进程在完成了数据写之后不会发送SIGNAL信号(内核态进程会检查用户态进程是否处于READING状态)
) FLUSHING: 用户态进程已经完成了数据读取,准备好开始捕获信号
. 内核态进程在写完Page.n的数据之后,会将当前内存页的READ_FLAG设置为1,如果用户态进程处于FLUSH态,会向用户态进程发送一个SIGNAL信号,告诉用户态进程当前第n个内存页已经写完毕了,这个SIGNAL信号是通过SIGNAL BUFFER缓存的
//可能存在内核态进程并发地发送了多个SIGNAL
. 用户态进程接收到SIGNAL信号的通知后,通过SetREADING将自己的状态切换为READING态,立即开始从环形队列的开头开始逐页读数据,在没有完成读数据之前,内核态进程不会再发送SIGNAL
. 用户态进程会从环形队列开头逐页检查当前页是否可读(READ_FLAG=),在读取完当前Page页后将当前页的READ_FLAG设置为0
. 用户态进程会不间断地一直往下读,直到遇到某个内存页的READ_FLAG=,说明当前内核态挤压的数据已经全部读取完
/*
这个逻辑可以实现几乎无锁的异步式的共享内存读写,在高负载IO的情况下可以获得很好的通信性能,但这里面存在一个问题,简单描述如下
1. 内核态有多个进程同时请求内存页进行写数据,在互斥锁的控制下它们分别获取了紧挨着的一系列的内存页进行写入
2. 其中有一个进程率先完成了写数据,在设置了READ_FLAG之后,向用户态发送信号,但此时其他进程还没有完成数据的写入
3. 所以用户态进程在遍历整个环形队列的时候,只会读取那块已经完成了数据写入的内存页,而不会读取其他未完成数据写入的内存页
4. 如果在这个期间,其他进程完成了数据的写入,但是因为用户态进程处于READING态导致信号不会被发送(即刚刚好用户态进程遍历过它之后它才完成写入),则对应的内存页就不会被用户态进程读取了,而只能等到下一次其他进程发送SIGNAL信号,才有机会被读取
5. 在串行Hook的模式下,这种数据的积压会造成Hook超时,而最终丢包
*/
. 为了解决这个问题,用户态进程采取了"double循环遍历",即完成第一次遍历之后,通过SetFLUSHING将当前状态设置了FLUSH,然后再次遍历一次环形队列,这大大降低了数据超时的发生
aaarticlea/png;base64," alt="" />
Copyright (c) 2015 LittleHann All rights reserved
Circular Queue Implementation Principle的更多相关文章
-
[Swift]LeetCode622. 设计循环队列 | Design Circular Queue
Design your implementation of the circular queue. The circular queue is a linear data structure in w ...
-
[LeetCode] Design Circular Queue 设计环形队列
Design your implementation of the circular queue. The circular queue is a linear data structure in w ...
-
LeetCode 622. Design Circular Queue
原题链接在这里:https://leetcode.com/problems/design-circular-queue/ 题目: Design your implementation of the c ...
-
[LeetCode] 622.Design Circular Queue 设计环形队列
Design your implementation of the circular queue. The circular queue is a linear data structure in w ...
-
Design Circular Queue
Design your implementation of the circular queue. The circular queue is a linear data structure in w ...
-
【leetcode】622. Design Circular Queue
题目如下: Design your implementation of the circular queue. The circular queue is a linear data structur ...
-
C#LeetCode刷题之#622-设计循环队列​​​​​​​(Design Circular Queue)
问题 该文章的最新版本已迁移至个人博客[比特飞],单击链接 https://www.byteflying.com/archives/4126 访问. 设计你的循环队列实现. 循环队列是一种线性数据结构 ...
-
【LeetCode】622. Design Circular Queue 解题报告(Python & C++)
作者: 负雪明烛 id: fuxuemingzhu 个人博客: http://fuxuemingzhu.cn/ 目录 题目描述 题目大意 解题方法 用直的代替弯的 数组循环利用 日期 题目地址:htt ...
-
LeetCode 622:设计循环队列 Design Circular Queue
LeetCode 622:设计循环队列 Design Circular Queue 首先来看看队列这种数据结构: 队列:先入先出的数据结构 在 FIFO 数据结构中,将首先处理添加到队列中的第一个元素 ...
随机推荐
-
webapi - 使用依赖注入
本篇将要和大家分享的是webapi中如何使用依赖注入,依赖注入这个东西在接口中常用,实际工作中也用的比较频繁,因此这里分享两种在api中依赖注入的方式Ninject和Unity:由于快过年这段时间打算 ...
-
Android开发重点难点1:RelativeLayout(相对布局)详解
前言 啦啦啦~博主又推出了一个新的系列啦~ 之前的Android开发系列主要以完成实验的过程为主,经常会综合许多知识来写,所以难免会有知识点的交杂,给人一种混乱的感觉. 所以博主推出“重点难点”系列, ...
-
使用Angular2理由
1. 组件化 组件化编程是web 发展的一个趋势,Angular2提供高效简单的组件开发方式,使程序开发更加关注业务逻辑的实现,而不用关心如何加载组件和模块,如何引用及依赖注入的实现等. 如下代码所示 ...
-
字符串匹配的Boyer-Moore算法
作者: 阮一峰 http://www.ruanyifeng.com/blog/2013/05/boyer-moore_string_search_algorithm.html 上一篇文章,我介绍 ...
-
Cordova webapp实战开发:(5)如何写一个Andorid下自动更新的插件?
在 <Cordova webapp实战开发:(4)Android环境搭建>中我们搭建好了开发环境,也给大家布置了调用插件的预习作业,做得如何了呢?今天我们来学一下如何自己从头建立一个And ...
-
4 个最好的 Linux 引导程序
导读 当你打开你的机器,开机自检(POST)成功完成后,BIOS(基本输入输出系统)立即定位所配置的引导介质,并从 MBR(主引导记录)或 GUID(全局唯一标识符)分区表读取一些命令,这是引导介质的 ...
-
Android 中 SQLite 数据库的查看
当 SQLite 数据库创建完成后,如何查看数据库的内容呢?如果直接使用 File Explorer 查看,最多只能看到 database 目录下出现了一个 BookStore.db 文件,Book ...
-
iOS界面的绘制和渲染
界面的绘制和渲染 UIView是如何到显示的屏幕上的. 这件事要从RunLoop开始,RunLoop是一个60fps的回调,也就是说每16.7ms绘制一次屏幕,也就是我们需要在这个时间内完成view的 ...
-
iOS: 使用CGContextRef,CGPath和UIBezierPath来绘画
这三种东西:CGContextRef,CGPath和UIBezierPath.本质上都是一样的,都是使用Quartz来绘画.只不过把绘图操作暴露在不同的API层面上,在具体实现上,当然也会有一些细小的 ...
-
【Zookeeper】源码分析之网络通信(二)
一.前言 前面介绍了ServerCnxn,下面开始学习NIOServerCnxn. 二.NIOServerCnxn源码分析 2.1 类的继承关系 public class NIOServerCnxn ...