协程与Epoll的配合

时间:2021-03-02 09:45:12

想快速了解协程与网络调用的原来么,那么请赶紧关闭本页,因为下面都是在扯淡。

这几天是端午假期,第一天大算照着网上一大堆基于ucontext来写协程的文章自己也写一个简单的协程实现。于是第一天我就开始动手了,非常轻松愉快,毕竟是抄么。但是很多文章写到怎么用ucontext做切换就戛然而止了,很显然在我们日常协程用的比较多的网络应用中没有人会去做手工的协程切换,这些协程的切换调度其实都被封装在socket接口中,即在协程进行网络操作需要等待时调用yield使得当前协程挂起,在等待的时间到达时调用resume恢复已挂起的协程。这样我们就可以用同步的编程模式写出异步的网络程序。所以我打算把这一步也实现一下,当然这里比起用ucontext来讲还是麻烦一些,在结合epoll实现同步阻塞的read和write时调了一些时间。

在一天之内没有写完,假期么,早点回去看看电影也是应该的,顺便买点饮料饼干准备过夜。手上拿的东西比较多,进出租房的时候,一阵风把还没来得及关上的门‘喷’的一吹,过道里就传来东北房东的喊声。寄人篱下,又是深夜我知道一顿教训是没逃了,结果这逼没玩没了,我就不能忍了,搞得我故意关的,亏我平时进出都小心翼翼的都不是关而是把门‘合’上的。结果自然是我得搬走,毕竟房子是人家的。其实我倒是不太气,因为虽然这离公司不过两百米,但不到6平米的空间还包括了卫生间,这房东又住在出租的套间里,还时不时的抽烟,原来想着合同签了要反悔押金就要损失了。这回倒好,让我走,行,把押金退了就好,这货也答应。所以这个结果不知是双赢还是双输。

端午的第二天我就破天荒的走了2.5万步,奔波找房子,其实我是比较屌丝就是不想花太多钱在房租上,如果肯出个两千房子随便找,一大把。原来还不敢直接打出租房前贴的小广告电话,结果那天一口气打了二三十个,脾气好的没房子也会跟你说一句,脾气差的直接挂电话。这时候想想以前和同学一起租个套间真是好太多了,房东人也不错。

三天假期突然就到第三天了,先把租房的事情放一放,大不了睡公司,房子总是会有的,不就是个钱的问题。这天就用异步和协程切换把socket的同步阻塞读写给实现了一下,并且写了个echo server像是网络编程界的helloworld。好了talk is cheap,shou you the code!(需要使用-std=c++11编译选项)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <ucontext.h> #include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#include <errno.h> #include <map>
#include <list> #define MAX_ROUTINE 256
#define MAX_STACK_SIZE_KB 128
#define MAX_EVENT_SIZE 10240
enum { UNUSED = 0, IDLE = 1, RUNNING = 2 }; typedef void (*STD_ROUTINE_FUNC)(void); typedef struct {
ucontext_t ctx;
char stack[ MAX_STACK_SIZE_KB * 1024 ];
STD_ROUTINE_FUNC func;
int status;
int wait_fd;
int events;
} RoutineContext; typedef std::map<unsigned int, std::list<int> > TimeoutMap; typedef struct {
struct epoll_event events[MAX_ROUTINE];
RoutineContext routines[MAX_ROUTINE];
TimeoutMap timeout_map;
ucontext_t main;
int epoll_fd;
int running_id;
int routine_cnt;
} RoutineCenter; RoutineCenter routinecenter; void init() {
srand(time(NULL));
routinecenter.running_id = -1;
} void routine_wrap() {
int running_id = routinecenter.running_id;
if ( running_id < 0 ) {
puts("current context don't attach to any routine except main.");
return;
}
routinecenter.routines[running_id].func(); routinecenter.routines[running_id].status = UNUSED;
routinecenter.routine_cnt--;
} int create( STD_ROUTINE_FUNC routine_proc) {
int new_id = -1;
for (int i = 0; i < MAX_ROUTINE; i++) {
if ( routinecenter.routines[i].status == UNUSED ) {
new_id = i;
break;
}
} if ( new_id < 0 ) {
puts("max routine number reached. no more routine.");
return -1;
} ucontext_t* pctx = &(routinecenter.routines[new_id].ctx);
getcontext(pctx); pctx->uc_stack.ss_sp = routinecenter.routines[new_id].stack;
pctx->uc_stack.ss_size = MAX_STACK_SIZE_KB * 1024;
pctx->uc_stack.ss_flags = 0;
pctx->uc_link = &(routinecenter.main); makecontext(pctx, routine_wrap, 0); routinecenter.routines[new_id].status = IDLE;
routinecenter.routines[new_id].func = routine_proc;
routinecenter.routine_cnt++;
return new_id;
} int yield() {
if ( routinecenter.running_id < 0 ) {
puts("no routine running except main.");
return 0;
}
int running_id = routinecenter.running_id;
RoutineContext* info = &(routinecenter.routines[running_id]);
info->status = IDLE;
info->events = 0;
swapcontext( &(info->ctx), &(routinecenter.main) );
return 0;
} int resume(int id, int events = 0) {
if ( id < 0 || id >= MAX_ROUTINE ) {
puts("routine id out of bound.");
return -1;
}
int running_id = routinecenter.running_id;
if (id == running_id) {
puts("current routine is running already.");
return 0;
}
if (routinecenter.routines[id].status != IDLE) {
puts("target routine is not in idel status. can't resume");
return -1;
} routinecenter.running_id = id;
routinecenter.routines[id].status = RUNNING;
routinecenter.routines[id].events = events;
if (running_id < 0) {
// in main
swapcontext( &(routinecenter.main), &(routinecenter.routines[id].ctx));
routinecenter.running_id = -1;
} else {
// in other routine
routinecenter.routines[running_id].status = IDLE;
swapcontext( &(routinecenter.routines[running_id].ctx), &(routinecenter.routines[id].ctx) );
routinecenter.running_id = running_id;
}
return 0;
} int routine_id() { return routinecenter.running_id; } void set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
int ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (ret < 0) {
perror("set nonblocking fail.");
exit(-1);
}
} void mod_event(int fd, int events, int op, int routine_id) {
struct epoll_event ev = {0};
if ( EPOLL_CTL_DEL != op ) {
ev.data.fd= routine_id;
routinecenter.routines[routine_id].wait_fd = fd;
}
ev.events = events; int ret = epoll_ctl(routinecenter.epoll_fd, op, fd, &ev);
if (ret < 0) {
if ( errno == EEXIST && op != EPOLL_CTL_DEL) {
epoll_ctl(routinecenter.epoll_fd, EPOLL_CTL_MOD, fd, &ev);
}
}
} int routine_read(int fd, char* buff, int size) {
mod_event(fd, EPOLLIN, EPOLL_CTL_ADD, routine_id());
while (!(routinecenter.routines[routine_id()].events & EPOLLIN)) {
yield();
}
while (routinecenter.routines[routine_id()].events & EPOLLIN) {
int need = size;
int readin = 0;
while (need > 0) {
int ret = read(fd, buff + readin, need);
if (ret <= 0) {
break;
} else {
readin += ret;
need -= ret;
}
}
if (readin == 0 && size != 0) {
yield();
continue;
} else {
mod_event(fd, EPOLLIN, EPOLL_CTL_DEL, routine_id());
}
return readin;
}
printf("routine[%d][%s]routine system ran out of order.\n", routine_id(), __func__);
return 0;
} int routine_write(int fd, char* buff, int size) {
mod_event(fd, EPOLLOUT, EPOLL_CTL_ADD, routine_id());
while (!(routinecenter.routines[routine_id()].events & EPOLLOUT)) {
yield();
}
while (routinecenter.routines[routine_id()].events & EPOLLOUT) {
int need = size;
int wrout = 0;
while (need > 0) {
int ret = write(fd, buff + wrout, need);
if (ret <= 0) {
break;
} else {
wrout += ret;
need -= ret;
}
}
if ( wrout == 0 && size != 0 ) {
yield();
continue;
} else {
mod_event(fd, EPOLLOUT, EPOLL_CTL_DEL, routine_id());
}
return wrout;
}
printf("routine[%d][%s]routine system ran out of order.\n", routine_id(), __func__);
return 0;
} void routine_delay_resume(int rid, int delay_sec) {
if (delay_sec <= 0) {
resume(rid);
return;
}
routinecenter.timeout_map[time(NULL) + delay_sec].push_back(rid);
} void routine_sleep(int time_sec) {
routine_delay_resume(routine_id(), time_sec);
yield();
} int routine_nearest_timeout() {
if (routinecenter.timeout_map.empty()) {
return 60 * 1000; // default epoll timeout
}
unsigned int now = time(NULL);
int diff = routinecenter.timeout_map.begin()->first - now;
return diff < 0 ? 0 : diff * 1000;
} void routine_resume_timeout() {
// printf("[epoll] process timeout\n");
if ( routinecenter.timeout_map.empty() ) {
return;
}
unsigned int timestamp = routinecenter.timeout_map.begin()->first; if (timestamp > time(NULL)) {
return;
} std::list<int>& routine_ids = routinecenter.timeout_map.begin()->second; for (int i : routine_ids) {
resume(i);
}
routinecenter.timeout_map.erase(timestamp);
} void routine_resume_event(int n) {
// printf("[epoll] process event\n");
for (int i = 0; i < n; i++) {
int rid = routinecenter.events[i].data.fd;
resume(rid, routinecenter.events[i].events);
}
} void create_routine_poll() {
routinecenter.epoll_fd = epoll_create1 (0); if (routinecenter.epoll_fd == -1) {
perror ("epoll_create");
exit(-1);
}
} void routine_poll() { for (;;) {
int n = epoll_wait (routinecenter.epoll_fd, routinecenter.events, MAX_EVENT_SIZE, routine_nearest_timeout());
// printf("[epoll] event_num:%d\n", n);
routine_resume_timeout();
routine_resume_event(n);
}
} void echo_server_routine() {
int conn_fd = routinecenter.routines[routine_id()].wait_fd; printf("routine[%d][%s] server start. conn_fd: %d\n", routine_id(), __func__, conn_fd); for (;;) {
printf("routine[%d][%s] loop start. conn_fd: %d\n", routine_id(), __func__, conn_fd);
char buf[512] = {0};
int n = 0; n = routine_read( conn_fd, buf, sizeof (buf) );
if (n < 0) {
perror("server read error.");
break;
}
buf[n] = '\0';
printf("routine[%d][%s] server read: %s", routine_id(), __func__, buf); unsigned int in_ts = time(NULL);
routine_sleep(1);
unsigned int out_ts= time(NULL); char obuf[512] = {0};
snprintf(obuf, sizeof(obuf), "%s rev_ts:%u sent_ts:%u\n", buf, in_ts, out_ts);
printf("routine[%d][%s] server write: %s", routine_id(), __func__, obuf); n = routine_write(conn_fd, obuf, strlen(obuf) + 1);
if (n < 0) {
perror("server write error.");
break;
}
}
printf("routine[%d][%s] server start. conn_fd: %d\n", routine_id(), __func__, conn_fd);
} void request_accept() {
for (;;) {
struct sockaddr_in addr = {0};
socklen_t slen = sizeof(addr);
int fd = accept(routinecenter.routines[routine_id()].wait_fd, (struct sockaddr*)&addr, &slen);
struct sockaddr_in peer = {0};
int ret = getpeername(fd, (struct sockaddr*)&peer, &slen);
if (ret < 0) {
perror("getpeername error.");
exit(-1);
}
printf("routine[%d][%s] accept from %s conn_fd:%d\n", routine_id(), __func__, inet_ntoa(peer.sin_addr), fd);
set_nonblocking(fd);
int rid = create( echo_server_routine );
routinecenter.routines[rid].wait_fd = fd; mod_event(fd, EPOLLIN, EPOLL_CTL_ADD, rid); resume(rid); yield();
}
} void bind_listen(unsigned short port) {
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
int ret = bind( listen_fd, (struct sockaddr*)&addr, sizeof(struct sockaddr) );
if (ret < 0) {
perror("bind fail.");
exit(-1);
}
ret = listen( listen_fd, 20 );
if (ret < 0) {
perror("listen fail.");
exit(-1);
}
printf("routine[%d] listen bind at port: %u\n", routine_id(), port);
set_nonblocking( listen_fd );
int rid = create( request_accept );
mod_event( listen_fd, EPOLLIN, EPOLL_CTL_ADD, rid );
} int main() {
init(); create_routine_poll(); bind_listen(55667);
bind_listen(55668);
bind_listen(55669); routine_delay_resume(create( [](){ printf("routine[%d] alarm fired\n", routine_id()); } ), 3); routine_poll(); puts("all routine exit"); return 0;
}

协程与Epoll的配合的更多相关文章

  1. Python之路-python&lpar;Queue队列、进程、Gevent协程、Select&bsol;Poll&bsol;Epoll异步IO与事件驱动&rpar;

    一.进程: 1.语法 2.进程间通讯 3.进程池 二.Gevent协程 三.Select\Poll\Epoll异步IO与事件驱动 一.进程: 1.语法 简单的启动线程语法 def run(name): ...

  2. 多进程、协程、事件驱动及select poll epoll

    目录 -多线程使用场景 -多进程 --简单的一个多进程例子 --进程间数据的交互实现方法 ---通过Queues和Pipe可以实现进程间数据的传递,但是不能实现数据的共享 ---Queues ---P ...

  3. Python学习笔记整理总结【网络编程】【线程&sol;进程&sol;协程&sol;IO多路模型&sol;select&sol;poll&sol;epoll&sol;selector】

    一.socket(单链接) 1.socket:应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口.在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socke ...

  4. Python 协程&sol;异步IO&sol;Select&bsol;Poll&bsol;Epoll异步IO与事件驱动

    1 Gevent 协程 协程,又称微线程,纤程.英文名Coroutine.一句话说明什么是线程:协程是一种用户态的轻量级线程. 协程拥有自己的寄存器上下文和栈.协程调度切换时,将寄存器上下文和栈保存到 ...

  5. python之协程与IO操作

    协程 协程,又称微线程,纤程.英文名Coroutine. 协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用. 子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B ...

  6. python学习道路&lpar;day11note&rpar;&lpar;协程&comma;同步与异步的性能区别&comma;url爬网页&comma;select&comma;RabbitMq&rpar;

    1.协程 #协程 又称微线程 是一种用户的轻量级线程 程序级别代码控制 就不用加机器 #不同函数 = 不同任务 A函数切到B函数没有进行cpu级别的切换,而是程序级别的切换就是协程 yelied #单 ...

  7. Python开发【第九章】:线程、进程和协程

    一.线程 线程是操作系统能够进行运算调度的最小单位.它被包含在进程之中,是进程中的实际运作单位.一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务 1.t ...

  8. 关于协程的学习 &amp&semi; 线程栈默认10M

    先看的这篇文章:http://blog.csdn.net/qq910894904/article/details/41699541 以nginx为代表的事件驱动的异步server正在横扫天下,那么事件 ...

  9. python 自动化之路 day 10 协程、异步IO、队列、缓存

    本节内容 Gevent协程 Select\Poll\Epoll异步IO与事件驱动 RabbitMQ队列 Redis\Memcached缓存 Paramiko SSH Twsited网络框架 引子 到目 ...

随机推荐

  1. 深入理解Javascript--闭包

    原网站http://www.cnblogs.com/xiaoloulan/p/5980569.html 在了解闭包之前需要了解下作用域的工作原理作为基础,传送门. 闭包是一个老生常谈的问题,在面试中也 ...

  2. 2013 duilib入门简明教程 -- XML基础类&lpar;7&rpar;

    现在大家应该对XML描述界面不那么陌生了,那么我们做进一步介绍. 前面的教程我们写了很多代码,为的是让大家了解下基本流程,其实duilib已经对常用的操作做了很好的包装,正式使用时无需像前面的教程那样 ...

  3. 20161013001 DataGridView 数据转 DataTable

    DataTable dt2 =  GetDgvToTable(Form_CY_ProjectRequirements_D);    MessageBox.Show( dt2.Rows.Count.To ...

  4. 定时器的应用---中断方式---让8个LED灯,左右各4个来回亮

    定时器的应用---中断方式---让8个LED灯,左右各4个来回亮 /*************************** 中断方式 是主程序专注于其他的事情, 待定时器中断时才执行中断子程序. ** ...

  5. IIS本地部署项目出错

    今天,打开部署在本地IIS的项目发现出错了.报的错,跟没有连接网络一样的.我当时有点懵,过一会儿再静下心来,想想这是什么原因. 第一步,把所有部署的项目,都打开看了一下,方便找出对比. 发现,绑定了I ...

  6. 【原版的】PHP技术成长规划过程中猿人

    PHP程序猿的技术成长规划 作者:黑夜路人(2014/10/15) 依照了解的非常多PHP/LNMP程序猿的发展轨迹.结合个人经验体会,抽象出非常多程序猿对未来的迷漫,特别对技术学习的盲目和慌乱.简单 ...

  7. &period;NEL IL实现对象深拷贝

    对于深拷贝,通常的方法是将对象进行序列化,然后再反序化成为另一个对象.例如在*上有这样的解决办法:https://*.com/questions/785 ...

  8. 持续集成-Jenkins安装部署

    1. 安装JDK[java8] 1.1. 软件安装 [yun@mini05 software]# pwd /app/software [yun@mini05 software]# .0_112.tar ...

  9. Docker安装及常用命令

    修改机器名: [root@docker /]# hostnamectl set-hostname Docker 安装EPEL源: [root@docker /]# yum -y install epe ...

  10. 解决Android Studio出现Failed to open zip file&period; Gradle&&num;39&semi;s dependency cache may be corrupt的问题

    问题如下图所示: 解决: 修改 gradle-wrapper.properties里的gradle的版本,与之前没有报错的gradle版本一致.就可以了 比如我报这个错的时候 : distributi ...