python之协程与IO操作

时间:2022-09-07 08:50:38

协程

协程,又称微线程,纤程。英文名Coroutine。

协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。

所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。

子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。

协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程可以被认为是一种用户空间线程,与传统的抢占式线程相比,有2个主要的优点:

  • 与线程不同,协程是自己主动让出CPU,并交付他期望的下一个协程运行,而不是在任何时候都有可能被系统调度打断。因此协程的使用更加清晰易懂,并且多数情况下不需要锁机制。
  • 与线程相比,协程的切换由程序控制,发生在用户空间而非内核空间,因此切换的代价非常的小。
  • 某种意义上,协程与线程的关系类似与线程与进程的关系,多个协程会在同一个线程的上下文之中运行。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

进程、线程和协程的区别

进程:

进程之间不共享任何状态,进程的调度由操作系统完成,每个进程都有自己独立的内存空间,进程间通讯主要是通过信号传递的方式来实现的,实现方式有多种,信号量、管道、事件等,任何一种方式的通讯效率都需要过内核,导致通讯效率比较低。由于是独立的内存空间,上下文切换的时候需要保存先调用栈的信息、cpu各寄存器的信息、虚拟内存、以及打开的相关句柄等信息,所以导致上下文进程间切换开销很大,通讯麻烦。

线程:

线程之间共享变量,解决了通讯麻烦的问题,但是对于变量的访问需要锁,线程的调度主要也是有操作系统完成,一个进程可以拥有多个线程,但是其中每个线程会共享父进程像操作系统申请资源,这个包括虚拟内存、文件等,由于是共享资源,所以创建线程所需要的系统资源占用比进程小很多,相应的可创建的线程数量也变得相对多很多。线程时间的通讯除了可以使用进程之间通讯的方式以外还可以通过共享内存的方式进行通信,所以这个速度比通过内核要快很多。另外在调度方面也是由于内存是共享的,所以上下文切换的时候需要保存的东西就像对少一些,这样一来上下文的切换也变得高效。

协程:

协程的调度完全由用户控制,一个线程可以有多个协程,用户创建了几个线程,然后每个线程都是循环按照指定的任务清单顺序完成不同的任务,当任务被堵塞的时候执行下一个任务,当恢复的时候再回来执行这个任务,任务之间的切换只需要保存每个任务的上下文内容,就像直接操作栈一样的,这样就完全没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快;另外协程还需要保证是非堵塞的且没有相互依赖,协程基本上不能同步通讯,多采用一步的消息通讯,效率比较高。

进程、线程与协程

  从硬件发展来看,从最初的单核单CPU,到单核多CPU,多核多CPU,似乎已经到了极限了,但是单核CPU性能却还在不断提升。server端也在不断的发展变化。如果将程序分为IO密集型应用和CPU密集型应用,二者的server的发展如下:

    IO密集型应用: 多进程->多线程->事件驱动->协程
    CPU密集型应用:多进程-->多线程

  调度和切换的时间:进程   >   线程   >  协程

不需要实现复杂的内存共享且需利用多cpu,用多进程;实现复杂的内存共享及IO密集型应用:多线程或协程;实现复杂的内存共享及CPU密集型应用:协程

python之协程与IO操作

网络编程模型

我们首先来简单回顾一下一些常用的网络编程模型。网络编程模型可以大体的分为同步模型和异步模型两类。

  • 同步模型:

同步模型使用阻塞IO模式,在阻塞IO模式下调用read等IO函数时会阻塞线程直到IO完成或失败。

同步模型的典型代表是thread_per_connection模型,每当阻塞在主线程上的accept调用返回时则创建一个新的线程去服务于新的socket的读/写。这种模型的优点是程序逻辑简洁,符合人的思维;缺点是可伸缩性收到线程数的限制,当连接越来越多时,线程也越来越多,频繁的线程切换会严重拖累性能,同时不得不处理多线程同步的问题。

  • 异步模型:

异步模型一般使用非阻塞IO模式,并配合epoll/select/poll等多路复用机制。在非阻塞模式下调用read,如果没有数据可读则立即返回,并通知用户没有可读(EAGAIN/EWOULDBLOCK),而非阻塞当前线程。异步模型可以使一个线程同时服务于多个IO对象。

异步模型的典型代表是reactor模型。在reactor模型中,我们将所有要处理的IO事件注册到一个中心的IO多路复用器中(一般为epoll/select/poll),同时主线程阻塞在多路复用器上。一旦有IO事件到来或者就绪,多路复用器返回并将对应的IO事件分发到对应的处理器(即回调函数)中,最后处理器调用read/write函数来进行IO操作。

异步模型的特点是性能和可伸缩性比同步模型要好很多,但是其结构复杂,不易于编写和维护。在异步模型中,IO之前的代码(IO任务的提交者)和IO之后的处理代码(回调函数)是割裂开来的。

协程与网络编程

协程的出现出现为克服同步模型和异步模型的缺点,并结合他们的优点提供了可能:

现在假设我们有3个协程A,B,C分别要进行数次IO操作。这3个协程运行在同一个调度器或者说线程的上下文中,并依次使用CPU。调度器在其内部维护了一个多路复用器(epoll/select/poll)。

协程A首先运行,当它执行到一个IO操作,但该IO操作并没有立即就绪时,A将该IO事件注册到调度器中,并主动放弃CPU。这时调度器将B切换到CPU上开始执行,同样,当它碰到一个IO操作的时候将IO事件注册到调度器中,并主动放弃CPU。调度器将C切换到cpu上开始执行。当所有协程都被“阻塞”后,调度器检查注册的IO事件是否发生或就绪。假设此时协程B注册的IO时间已经就绪,调度器将恢复B的执行,B将从上次放弃CPU的地方接着向下运行。A和C同理。
这样,对于每一个协程来说,它是同步的模型;但是对于整个应用程序来说,它是异步的模型。

编程范式

编程范式(Programming Paradigm)是某种编程语言典型的编程风格或者说是编程方式。随着编程方法学和软件工程研究的深入,特别是OO思想的普及,范式(Paradigm)以及编程范式等术语渐渐出现在人们面前。面向对象编程(OOP)常常被誉为是一种革命性的思想,正因为它不同于其他的各种编程范式。编程范式也许是学习任何一门编程语言时要理解的最重要的术语。

托马斯.库恩提出“科学的革命”的范式论之后,Robert Floyd在1979年图灵奖的颁奖演说中使用了编程范式一词。编程范式一般包括三个方面,以OOP为例:

  1. 学科的逻辑体系——规则范式:如类/对象、继承、动态绑定、方法改写、对象替换等等机制。
  2. 心理认知因素——心理范式:按照面向对象编程之父Alan Kay的观点,“计算就是模拟”。OO范式极其重视隐喻(metaphor)的价值,通过拟人化,按照自然的方式模拟自然。
  3. 自然观/世界观——观念范式:强调程序的组织技术,视程序为松散耦合的对象/类的集合,以继承机制将类组织成一个层次结构,把程序运行视为相互服务的对象们之间的对话。

简单的说,编程范式是程序员看待程序应该具有的观点。

为了进一步加深对编程范式的认识,这里介绍几种最常见的编程范式。

需要再次提醒注意的是:编程范式是编程语言的一种分类方式,它并不针对某种编程语言。就编程语言而言,一种编程语言也可以适用多种编程范式。

过程化(命令式)编程

过程化编程,也被称为命令式编程,应该是最原始的、也是我们最熟悉的一种传统的编程方式。从本质上讲,它是“冯.诺伊曼机“运行机制的抽象,它的编程思维方式源于计算机指令的顺序排列。

(也就是说:过程化语言模拟的是计算机机器的系统结构,而并不是基于语言的使用者的个人能力和倾向。这一点我们应该都很清楚,比如:我们最早曾经使用过的单片机的汇编语言。)

过程化编程的步骤是:

首先,我们必须将待解问题的解决方案抽象为一系列概念化的步骤。然后通过编程的方式将这些步骤转化为程序指令集(算法),而这些指令按照一定的顺序排列,用来说明如何执行一个任务或解决一个问题。这就意味着,程序员必须要知道程序要完成什么,并且告诉计算机如何来进行所需的计算工作,包括每个细节操作。简言之,就是将计算机看作一个善始善终服从命令的装置。

所以在过程化编程中,把待解问题规范化、抽象为某种算法是解决问题的关键步骤。其次,才是编写具体算法和完成相应的算法实现问题的正确解决。当然,程序员对待解问题的抽象能力也是非常重要的因素,但这本身已经与编程语言无关了。

程序流程图是过程化语言进行程序编写的有效辅助手段。

尽管现存的计算机编程语言很多,但是人们把所有支持过程化编程范式的编程语言都被归纳为过程化编程语言。例如机器语言、汇编语言、BASIC、COBOL、C 、FORTRAN、语言等等许多第三代编程语言都被归纳为过程化语言。

过程化语言特别适合解决线性(或者说按部就班)的算法问题。它强调“自上而下(自顶向下)”“精益求精”的设计方式。这种方式非常类似我们的工作和生活方式,因为我们的日常活动都是按部就班的顺序进行的。

过程化语言趋向于开发运行较快且对系统资源利用率较高的程序。过程化语言非常的灵活并强大,同时有许多经典应用范例,这使得程序员可以用它来解决多种问题。

过程化语言的不足之处就是它不适合某些种类问题的解决,例如那些非结构化的具有复杂算法的问题。问题出现在,过程化语言必须对一个算法加以详尽的说明,并且其中还要包括执行这些指令或语句的顺序。实际上,给那些非结构化的具有复杂算法的问题给出详尽的算法是极其困难的。

广泛引起争议和讨论的地方是:无条件分支,或goto语句,它是大多数过程式编程语言的组成部分,反对者声称:goto语句可能被无限地滥用;它给程序设计提供了制造混 乱的机会。目前达成的共识是将它保留在大多数语言中,对于它所具有的危险性,应该通过程序设计的规定将其最小化。

事件驱动编程

其实,基于事件驱动的程序设计在图形用户界面(GUI)出现很久前就已经被应用于程序设计中,可是只有当图形用户界面广泛流行时,它才逐渐形演变为一种广泛使用的程序设计模式。

在过程式的程序设计中,代码本身就给出了程序执行的顺序,尽管执行顺序可能会受到程序输入数据的影响。

在事件驱动的程序设计中,程序中的许多部分可能在完全不可预料的时刻被执行。往往这些程序的执行是由用户与正在执行的程序的互动激发所致。

  • 事件。就是通知某个特定的事情已经发生(事件发生具有随机性)。
  • 事件与轮询。轮询的行为是不断地观察和判断,是一种无休止的行为方式。而事件是静静地等待事情的发生。事实上,在Windows出现之前,采用鼠标输入字符模式的PC应用程序必须进行串行轮询,并以这种方式来查询和响应不同的用户操做。
  • 事件处理器。是对事件做出响应时所执行的一段程序代码。事件处理器使得程序能够对于用户的行为做出反映。

事件驱动常常用于用户与程序的交互,通过图形用户接口(鼠标、键盘、触摸板)进行交互式的互动。当然,也可以用于异常的处理和响应用户自定义的事件等等。

事件的异常处理比用户交互更复杂。

事件驱动不仅仅局限在GUI编程应用。但是实现事件驱动我们还需要考虑更多的实际问题,如:事件定义、事件触发、事件转化、事件合并、事件排队、事件分派、事件处理、事 件连带等等。

其实,到目前为止,我们还没有找到有关纯事件驱动编程的语言和类似的开发环境。所有关于事件驱动的资料都是基于GUI事件的。

属于事件驱动的编程语言有:VB、C#、Java(Java Swing的GUI)等。它们所涉及的事件绝大多数都是GUI事件。

面向对象编程

过程化范式要求程序员用按部就班的算法看待每个问题。很显然,并不是每个问题都适合这种过程化的思维方式。这也就导致了其它程序设计范式出现,包括我们现在介绍的面向对象的程序设计范式。

面向对象的程序设计模式已经出现二十多年,经过这些年的发展,它的设计思想和设计模式已经稳定的进入编程语言的主流。来自TIOBE Programming Community2010年11月份编程语言排名的前三名Java、C、C++中,Java和C++都是面向对象的编程语言。

面向对象的程序设计包括了三个基本概念:封装性、继承性、多态性。面向对象的程序语言通过类、方法、对象和消息传递,来支持面向对象的程序设计范式。

1. 对象

世间万事万物都是对象。

面向对象的程序设计的抽象机制是将待解问题抽象为面向对象的程序中的对象。利用封装使每个对象都拥有个体的身份。程序便是成堆的对象,彼此通过消息的传递,请求其它对象 进行工作。

2. 类

每个对象都是其类中的一个实体。

物以类聚——就是说明:类是相似对象的集合。类中的对象可以接受相同的消息。换句话说:类包含和描述了“具有共同特性(数据元素)和共同行为(功能)”的一组对象。

比如:苹果、梨、橘子等等对象都属于水果类。

3. 封装

封装(有时也被称为信息隐藏)就是把数据和行为结合在一个包中,并对对象的使用者隐藏数据的实现过程。信息隐藏是面向对象编程的基本原则,而封装是实现这一原则的一种方 式。

封装使对象呈现出“黑盒子”特性,这是对象再利用和实现可靠性的关键步骤。

4. 接口

每个对象都有接口。接口不是类,而是对符合接口需求的类所作的一套规范。接口说明类应该做什么但不指定如何作的方法。一个类可以有一个或多个接口。

5. 方法

方法决定了某个对象究竟能够接受什么样的消息。面向对象的设计有时也会简单地归纳为“将消息发送给对象”。

6. 继承

继承的思想就是允许在已存在类的基础上构建新的类。一个子类能够继承父类的所有成员,包括属性和方法。

继承的主要作用:通过实现继承完成代码重用;通过接口继承完成代码被重用。继承是一种规范的技巧,而不是一种实现的技巧。

7. 多态

多态提供了“接口与实现分离”。多态不但能改善程序的组织架构及可读性,更利于开发出“可扩充”的程序。

继承是多态的基础。多态是继承的目的。

合理的运用基于类继承的多态、基于接口继承的多态和基于模版的多态,能增强程序的简洁性、灵活性、可维护性、可重用性和可扩展性。

面向对象技术一方面借鉴了哲学、心理学、生物学的思考方式,另一方面,它是建立在其他编程技术之上的,是以前的编程思想的自然产物。

如果说结构化软件设计是将函数式编程技术应用到命令式语言中进行程序设计,面向对象编程不过是将函数式模型应用到命令式程序中的另一途径,此时,模块进步为对象,过程龟缩到class的成员方法中。OOP的很多技术——抽象数据类型、信息隐藏、接口与实现分离、对象生成功能、消息传递机制等等,很多东西就是结构化软件设计所拥有的、或者在其他编程语言中单独出现。但只有在面向对象语言中,他们才共同出现,以一种独特的合作方式互相协作、互相补充。

编程范式 = 语感

知识的学习有几种方式:一种靠记忆,一种靠练习,一种靠培养。就拿英语学习来说吧,学单词,单靠记忆即可;学句型、语法,光记忆是不够的,须要勤加练习方可熟能生巧;而要讲出地道的英语,光记忆和练习是远远不够的。从小学到大学,甚至博士毕业,除了英语类专业的学生外,大多数人英语练了一二十年,水平如何?不客气但很客观地说:一个字,烂。

原因只有一个,那就是国内的英语教学方式严重失策。教学总是围绕单词、词组、句型、语法转,缺乏对语感的重视和培养,导致学生只会‘中式英语’。同样道理,一个惯用C语言编程的人也许很快就能写一些C++程序,但如果他只注重C++的语法而不注重培养OOP 的语感,那么写出的程序一定是‘C 式C++’。与其如此,倒不如直接用C 呢。”

一句话:学习编程范式能增强编程语言的语感。

语感是一个人对语言的敏锐感知力,反映了他在语言方面的整体上的直觉把握能力。语感强者,能听弦外之音,能说双关之语,能读隽永之作,能写晓畅之文。这是一种综合的素质和修养,其重要性是不言而喻的。那么如何培养语感呢?普通的学习和训练固不可少,但如果忽视语言背后的文化背景和思维方式,终究只是缘木求鱼。编程范式正体现了编程的思维方式,因而是培养编程语言的语感的关键。

语感有了,那些设计模式、框架,甚至架构,等看似神秘高深的东西,也会自然而然地来了。

使用yield实现协程操作例子

import time
import queue
def consumer(name):
print("--->starting eating baozi...")
while True:
new_baozi = yield
print("[%s] is eating baozi %s" % (name,new_baozi))
#time.sleep(1) def producer(): r = con.__next__()
r = con2.__next__()
n = 0
while n < 5:
n +=1
con.send(n)
con2.send(n)
print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__':
con = consumer("c1")
con2 = consumer("c2")
p = producer()

符合什么条件就能称之为协程:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 一个协程遇到IO操作自动切换到其它协程

基于上面这4点定义,我们刚才用yield实现的程并不能算是合格的线程.

Greenlet

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

greelet指的是使用一个任务调度器和一些生成器或者协程实现协作式用户空间多线程的一种伪并发机制,即所谓的微线程。

greelet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。

网络框架的几种基本的网络I/O模型:

阻塞式单线程:这是最基本的I/O模型,只有在处理完一个请求之后才会处理下一个请求。它的缺点是效能差,如果有请求阻塞住,会让服务无法继续接受请求。但是这种模型编写代码相对简单,在应对访问量不大的情况时是非常适合的。

阻塞式多线程:针对于单线程接受请求量有限的缺点,一个很自然的想法就是给每一个请求开一个线程去处理。这样做的好处是能够接受更多的请求,缺点是在线程产生到一定数量之后,进程之间需要大量进行切换上下文的操作,会占用CPU大量的时间,不过这样处理的话编写代码的难道稍高于单进程的情况。

非阻塞式事件驱动:为了解决多线程的问题,有一种做法是利用一个循环来检查是否有网络IO的事件发生,以便决定如何来进行处理(reactor设计模式)。这样的做的好处是进一步降低了CPU的资源消耗。缺点是这样做会让程序难以编写,因为请求接受后的处理过程由reactor来决定,使得程序的执行流程难以把握。当接受到一个请求后如果涉及到阻塞的操作,这个请求的处理就会停下来去接受另一个请求,程序执行的流程不会像线性程序那样直观。twisted框架就是应用这种IO模型的典型例子。

非阻塞式Coroutine(协程):这个模式是为了解决事件驱动模型执行流程不直观的问题,它在本质上也是事件驱动的,加入了Coroutine的概念。

与线程/进程的区别

线程是抢占式的调度,多个线程并行执行,抢占共同的系统资源;而微线程是协同式的调度。

其实greenlet不是一种真正的并发机制,而是在同一线程内,在不同函数的执行代码块之间切换,实施“你运行一会、我运行一会”,并且在进行切换时必须指定何时切换以及切换到哪。greenlet的接口是比较简单易用的,但是使用greenlet时的思考方式与其他并发方案存在一定区别:

1. 线程/进程模型在大逻辑上通常从并发角度开始考虑,把能够并行处理的并且值得并行处理的任务分离出来,在不同的线程/进程下运行,然后考虑分离过程可能造成哪些互斥、冲突问题,将互斥的资源加锁保护来保证并发处理的正确性。

2. greenlet则是要求从避免阻塞的角度来进行开发,当出现阻塞时,就显式切换到另一段没有被阻塞的代码段执行,直到原先的阻塞状况消失以后,再人工切换回原来的代码段继续处理。因此,greenlet本质是一种合理安排了的 串行 。

3. greenlet本质是串行,因此在没有进行显式切换时,代码的其他部分是无法被执行到的,如果要避免代码长时间占用运算资源造成程序假死,那么还是要将greenlet与线程/进程机制结合使用(每个线程、进程下都可以建立多个greenlet,但是跨线程/进程时greenlet之间无法切换或通讯)。

使用

一个 “greenlet” 是一个很小的独立微线程。可以把它想像成一个堆栈帧,栈底是初始调用,而栈顶是当前greenlet的暂停位置。你使用greenlet创建一堆这样的堆栈,然后在他们之间跳转执行。跳转不是绝对的:一个greenlet必须选择跳转到选择好的另一个greenlet,这会让前一个挂起,而后一个恢复。两 个greenlet之间的跳转称为 切换(switch) 。

当你创建一个greenlet,它得到一个初始化过的空堆栈;当你第一次切换到它,他会启动指定的函数,然后切换跳出greenlet。当最终栈底 函数结束时,greenlet的堆栈又编程空的了,而greenlet也就死掉了。greenlet也会因为一个未捕捉的异常死掉。

示例:来自官方文档示例
from greenlet import greenlet
def test1():
print 12
gr2.switch()
print 34
def test2():
print 56
gr1.switch()
print 78
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
最后一行跳转到 test1() ,它打印12,然后跳转到 test2() ,打印56,然后跳转回 test1() ,打印34,然后 test1() 就结束,gr1死掉。这时执行会回到原来的 gr1.switch() 调用。注意,78是不会被打印的,因为gr1已死,不会再切换。

基于greenlet的框架
# -*- coding:utf-8 -*-

from greenlet import greenlet

def test1():
print(12)
gr2.switch()
print(34)
gr2.switch() def test2():
print(56)
gr1.switch()
print(78) gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

Gevent: (实现遇到IO自动切换)

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

#自动切换
import gevent def foo():
print("Running in foo,foo开始>>>1")
gevent.sleep(2)
print('Explicit context switch to foo again foo完成>>>6') def bar():
print('Explicit精确的 context 内容 to bar bar开始>>>2')
gevent.sleep(1)
print("Imlicit context sitch back to bar bar结束>>>5") def func3():
print('running func3 func3开始>>>3')
gevent.sleep(0)
print('running func3 again func3结束>>>4') gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
gevent.spawn(func3),
])

结果:

结果:
Running in foo,foo开始>>>1
Explicit精确的 context 内容 to bar bar开始>>>2
running func3 func3开始>>>3
running func3 again func3结束>>>4
Imlicit context sitch back to bar bar结束>>>5
Explicit context switch to foo again foo完成>>>6
遇到sleep就自动切换,sleep后面的参数是秒。上面这个小程序最多2秒左右运行完
gevent是一个基于协程(coroutine)的Python网络函数库,通过使用greenlet提供了一个在libev事件循环顶部的高级别并发API。

主要特性有以下几点:

    基于libev的快速事件循环,Linux上面的是epoll机制

    基于greenlet的轻量级执行单元

    API复用了Python标准库里的内容

    支持SSL的协作式sockets

    可通过线程池或c-ares实现DNS查询

    通过monkey patching功能来使得第三方模块变成协作式
import gevent

def func1():
print('\033[31;1mTom在跟Jack搞...\033[0m')
gevent.sleep(2)
print('\033[31;1mTom又回去跟继续跟Jack搞...\033[0m') def func2():
print('\033[32;1mTom切换到了跟Sunny搞...\033[0m')
gevent.sleep(1)
print('\033[32;1mTom搞完了Jack,回来继续跟Sunny搞...\033[0m') gevent.joinall([
gevent.spawn(func1),
gevent.spawn(func2),
#gevent.spawn(func3),
])

结果:

Tom在跟Jack搞...
Tom切换到了跟Sunny搞...
Tom搞完了Jack,回来继续跟Sunny搞...
Tom又回去跟继续跟Jack搞...
Tom在跟Jack搞...
Tom切换到了跟Sunny搞...
Tom搞完了Jack,回来继续跟Sunny搞...
Tom又回去跟继续跟Jack搞...

通过gevent实现单线程下的多socket并发

server side

import sys
import socket
import time
import gevent from gevent import socket,monkey
monkey.patch_all() def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli) def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR) except Exception as ex:
print(ex)
finally:
conn.close()
if __name__ == '__main__':
server(8001)

client side   

import socket

HOST = 'localhost'    # The remote host
PORT = 8001 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"),encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
#print(data) print('Received', repr(data))
s.close()

简单的使用协程写一个爬虫:

串行

from urllib import request
import time def f(url):
print('GET:%s'%url)
resp = request.urlopen(url)
data = resp.read()
# file = open("data",'wb')#这里可以打开这两步,写入文件
# file.write(data)
print('%d bytes received from %s.'%(len(data),url)) #串性模式
urls = [
'https://www.python.org/',
'https://www.yahoo.com/',
'https://github.com/'] time_start = time.time()
for url in urls:
f(url)
print("同步cost",time.time()-time_start)

并行:

#因为gevent检测不到urllib是否进行了io操作,所以需要打补丁

from urllib import request
import gevent,time from gevent import monkey#打补丁(把下面有可能有IO操作的单独做上标记)
monkey.patch_all()#打补丁 def f(url):
print('GET:%s'%url)
resp = request.urlopen(url)
data = resp.read()
# file = open("data",'wb')#这里可以打开这两步,写入文件
# file.write(data)
print('%d bytes received from %s.'%(len(data),url)) #异步模式
async_time_start = time.time()
gevent.joinall([
gevent.spawn(f,'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f,'https://github.com/')
])
print("异步步cost",time.time()-async_time_start)

eventlet

eventlet 是基于 greenlet 实现的面向网络应用的并发处理框架,提供“线程”池、队列等与其他 Python 线程、进程模型非常相似的 api,并且提供了对 Python 发行版自带库及其他模块的超轻量并发适应性调整方法,比直接使用 greenlet 要方便得多。

其基本原理是调整 Python 的 socket 调用,当发生阻塞时则切换到其他 greenlet 执行,这样来保证资源的有效利用。需要注意的是:

    eventlet 提供的函数只能对 Python 代码中的 socket 调用进行处理,而不能对模块的 C 语言部分的 socket 调用进行修改。对后者这类模块,仍然需要把调用模块的代码封装在 Python 标准线程调用中,之后利用 eventlet 提供的适配器实现 eventlet 与标准线程之间的协作。

    虽然 eventlet 把 api 封装成了非常类似标准线程库的形式,但两者的实际并发执行流程仍然有明显区别。在没有出现 I/O 阻塞时,除非显式声明,否则当前正在执行的 eventlet 永远不会把 cpu 交给其他的 eventlet,而标准线程则是无论是否出现阻塞,总是由所有线程一起争夺运行资源。所有 eventlet 对 I/O 阻塞无关的大运算量耗时操作基本没有什么帮助。

关于Linux的epoll机制:

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。epoll的优点:

    支持一个进程打开大数目的socket描述符。select的一个进程所打开的FD由FD_SETSIZE的设置来限定,而epoll没有这个限制,它所支持的FD上限是最大可打开文件的数目,远大于2048。

    IO效率不随FD数目增加而线性下降:由于epoll只会对“活跃”的socket进行操作,于是,只有"活跃"的socket才会主动去调用 callback函数,其他idle状态的socket则不会。

    使用mmap加速内核与用户空间的消息传递。epoll是通过内核于用户空间mmap同一块内存实现的。

    内核微调。

libev机制

提供了指定文件描述符事件发生时调用回调函数的机制。libev是一个事件循环器:向libev注册感兴趣的事件,比如socket可读事件,libev会对所注册的事件的源进行管理,并在事件发生时触发相应的程序。

官方文档中的示例:
>>> import gevent

>>> from gevent import socket

>>> urls = ['www.google.com.hk','www.example.com', 'www.python.org'  ]

>>> jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

>>> gevent.joinall(jobs, timeout=2)

>>> [job.value for job in jobs]

['74.125.128.199', '208.77.188.166', '82.94.164.162']

注解:gevent.spawn()方法spawn一些jobs,然后通过gevent.joinall将jobs加入到微线程执行队列中等待其完成,设置超时为2秒。执行后的结果通过检查gevent.Greenlet.value值来收集。gevent.socket.gethostbyname()函数与标准的socket.gethotbyname()有相同的接口,但它不会阻塞整个解释器,因此会使得其他的greenlets跟随着无阻的请求而执行。

Monket patching

Python的运行环境允许我们在运行时修改大部分的对象,包括模块、类甚至函数。虽然这样做会产生“隐式的副作用”,而且出现问题很难调试,但在需要修改Python本身的基础行为时,Monkey patching就派上用场了。Monkey patching能够使得gevent修改标准库里面大部分的阻塞式系统调用,包括socket,ssl,threading和select等模块,而变成协作式运行。

>>> from gevent import monkey ;

>>> monkey . patch_socket ()

>>> import urllib2

通过monkey.patch_socket()方法,urllib2模块可以使用在多微线程环境,达到与gevent共同工作的目的。

事件循环

不像其他网络库,gevent和eventlet类似, 在一个greenlet中隐式开始事件循环。没有必须调用run()或dispatch()的反应器(reactor),在twisted中是有 reactor的。当gevent的API函数想阻塞时,它获得Hub实例(执行时间循环的greenlet),并切换过去。如果没有集线器实例则会动态 创建。

libev提供的事件循环默认使用系统最快轮询机制,设置LIBEV_FLAGS环境变量可指定轮询机制。LIBEV_FLAGS=1为select, LIBEV_FLAGS = 2为poll, LIBEV_FLAGS = 4为epoll,LIBEV_FLAGS = 8为kqueue。

Libev的API位于gevent.core下。注意libev API的回调在Hub的greenlet运行,因此使用同步greenlet的API。可以使用spawn()和Event.set()等异步API。

同步与异步的性能区别

import gevent

def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(0.5)
print('Task %s done' % pid) def synchronous():
for i in range(1,10):
task(i) def asynchronous():
threads = [gevent.spawn(task, i) for i in range(10)]
gevent.joinall(threads) print('Synchronous:')
synchronous() print('Asynchronous:')
asynchronous()

上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。  

遇到IO阻塞时会自动切换任务

from gevent import monkey; monkey.patch_all()
import gevent
from urllib.request import urlopen def f(url):
print('GET: %s' % url)
resp = urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])

通过gevent实现单线程下的多socket并发

server side

import sys
import socket
import time
import gevent from gevent import socket,monkey
monkey.patch_all() def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli) def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR) except Exception as ex:
print(ex)
finally:
conn.close()
if __name__ == '__main__':
server(8001)

client side   

import socket

HOST = 'localhost'    # The remote host
PORT = 8001 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"),encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
#print(data) print('Received', repr(data))
s.close()

并发100个sock连接

import socket
import threading def sock_conn(): client = socket.socket() client.connect(("localhost",8001))
count = 0
while True:
#msg = input(">>:").strip()
#if len(msg) == 0:continue
client.send( ("hello %s" %count).encode("utf-8")) data = client.recv(1024) print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
count +=1
client.close() for i in range(100):
t = threading.Thread(target=sock_conn)
t.start() 并发100个sock连接

事件驱动

事件驱动I/O本质上来讲就是将基本I/O操作(比如读和写)转化为你程序需要处理的事件。

计算机程序分类

鼠标的一个点击,移动,键盘的按键按下等等操作,都是对应操作系统的一个事件,然后应用程序接受你的操作进行处理
通常,我们写服务器处理模型的程序时,有以下几种模型:
(1)每收到一个请求,创建一个新的进程,来处理该请求;
(2)每收到一个请求,创建一个新的线程,来处理该请求;
(3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求
上面的几种方式,各有千秋,
第(1)种方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。
第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。
第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。
综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式,这也是本文讨论的重点—事件驱动处理库。

所有的计算机程序都可以大致分为两类:脚本型(单次运行)和连续运行型(直到用户主动退出)。

脚本型
脚本型的程序包括最早的批处理文件以及使用Python做交易策略回测等等,这类程序的特点是在用户启动后会按照编程时设计好的步骤一步步运行,所有步骤运行完后自动退出。

连续运行型
连续运行型的程序包含了操作系统和绝大部分我们日常使用的软件等等,这类程序启动后会处于一个无限循环中连续运行,直到用户主动退出时才会结束。

连续运行型程序

我们要开发的交易系统就是属于连续运行型程序,而这种程序根据其计算逻辑的运行机制不同,又可以粗略的分为时间驱动和事件驱动两种。

时间驱动

时间驱动的程序逻辑相对容易设计,简单来说就是让电脑每隔一段时间自动做一些事情。这个事情本身可以很复杂、包括很多步骤,但这些步骤都是线性的,按照顺序一步步执行下来。

以下代码展示了一个非常简单的时间驱动的Python程序。

from time import sleep

def demo():
print u'时间驱动的程序每隔1秒运行demo函数' while 1:
demo()
sleep(1.0)

时间驱动的程序本质上就是每隔一段时间固定运行一次脚本(上面代码中的demo函数)。尽管脚本自身可以很长、包含非常多的步骤,但是我们可以看出这种程序的运行机制相对比较简单、容易理解。

举一些量化交易相关的例子:

  1. 每隔5分钟,通过新浪财经网页的公开API读取一次沪深300成分股的价格,根据当日涨幅进行排序后输出到电脑屏幕上。
  2. 每隔1秒钟,检查一次最新收到的股指期货TICK数据,更新K线和其他技术指标,检查是否满足趋势策略的下单条件,若满足则执行下单。

对速度要求较高的量化交易方面(日内CTA策略、高频策略等等),时间驱动的程序会存在一个非常大的缺点:对数据信息在反应操作上的处理延时。例子2中,在每次逻辑脚本运行完等待的那1秒钟里,程序对于接收到的新数据信息(行情、成交推送等等)是不会做出任何反应的,只有在等待时间结束后脚本再次运行时才会进行相关的计算处理。而处理延时在量化交易中的直接后果就是money:市价单滑点、限价单错过本可成交的价格。

时间驱动的程序在量化交易方面还存在一些其他的缺点:如浪费CPU的计算资源、实现异步逻辑复杂度高等等。

事件驱动

与时间驱动对应的就是事件驱动的程序:当某个新的事件被推送到程序中时(如API推送新的行情、成交),程序立即调用和这个事件相对应的处理函数进行相关的操作。

上面例子2的事件驱动版:交易程序对股指TICK数据进行监听,当没有新的行情过来时,程序保持监听状态不进行任何操作;当收到新的数据时,数据处理函数立即更新K线和其他技术指标,并检查是否满足趋势策略的下单条件执行下单。

对于简单的程序,我们可以采用上面测试代码中的方案,直接在API的回调函数中写入相应的逻辑。但随着程序复杂度的增加,这种方案会变得越来越不可行。假设我们有一个带有图形界面的量化交易系统,系统在某一时刻接收到了API推送的股指期货行情数据,针对这个数据系统需要进行如下处理:

  1. 更新图表上显示的K线图形(绘图)
  2. 更新行情监控表中股指期货的行情数据(表格更新)
  3. 策略1需要运行一次内部算法,检查该数据是否会触发策略进行下单(运算、下单)
  4. 策略2同样需要运行一次内部算法,检查该数据是否会触发策略进行下单(运算、下单)
  5. 风控系统需要检查最新行情价格是否会导致账户的整体风险超限,若超限需要进行报警(运算、报警)

此时将上面所有的操作都写到一个回调函数中无疑变成了非常差的方案,代码过长容易出错不说,可扩展性也差,每添加一个策略或者功能则又需要修改之前的源代码(有经验的读者会知道,经常修改生产代码是一种非常危险的运营管理方法)。

为了解决这种情况,我们需要用到事件驱动引擎来管理不同事件的事件监听函数并执行所有和事件驱动相关的操作。

事件驱动引擎原理

事件驱动模式可以进一步抽象理解为由事件源,事件对象,以及事件监听器三元素构成,能完成监听器监听事件源、事件源发送事件,监听器收到事件后调用响应函数的动作。

事件驱动主要包含以下元素和操作函数:
元素
1.事件源
2.事件监听器
3.事件对象

操作函数
4.监听动作
5.发送事件
6.调用监听器响应函数

了解清楚了事件驱动的工作原理后,读者可以试着用自己熟悉的编程语言实现,编程主要实现下面的内容,笔者后续给python实现

用户根据实际业务逻辑定义
事件源 EventSources
监听器 Listeners

事件管理者 EventManager
成员
1.响应函数队列 Handlers
2.事件对象 Event
3.事件对象列表 EventQueue
操作函数
4.监听动作 AddEventListener
5.发送事件 SendEvent
6.调用响应函数 EventProcess

在实际的软件开发过程中,你会经常看到事件驱动的影子,几乎所有的GUI界面都采用事件驱动编程模型,很多服务器网络模型的消息处理也会采用,甚至复杂点的数据库业务处理也会用这种模型,因为这种模型解耦事件发送者和接收者之间的联系,事件可动态增加减少接收者,业务逻辑越复杂,越能体现它的优势。

论事件驱动模型
在UI编程中,,常常要对鼠标点击进行相应,首先如何获得鼠标点击呢?
方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点:

  1. CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费;如果扫描鼠标点击的接口是阻塞的呢?
  2. 如果是堵塞的,又会出现下面这样的问题,如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;
  3. 如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题;
    所以,该方式是非常不好的。

方式二:就是事件驱动模型
目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:

  1. 有一个事件(消息)队列;
  2. 鼠标按下时,往这个队列中增加一个点击事件(消息);
  3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
  4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;
     
    python之协程与IO操作

    事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
     
    让我们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展示了随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。
    python之协程与IO操作

最初的问题:怎么确定IO操作完了切回去呢?通过回调函数

在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。

当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

  1. 程序中有许多任务,而且…
  2. 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
  3. 在等待事件到来时,某些任务会阻塞。

当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。

网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。

事件驱动编程是指程序的执行流程取决于事件的编程风格,事件由事件处理程序或者事件回调函数进行处理。当某些重要的事件发生时-- 例如数据库查询结果可用或者用户单击了某个按钮,就会调用事件回调函数。

事件驱动编程风格和事件循环相伴相生,事件循环是一个处于不间断循环中的结构,该结构主要具有两项功能-- 事件检测和事件触发处理,在每一轮循环中,它都必须检测发生了什么事件。当事件发生时,事件循环还要决定调用哪个回调函数。

事件循环只是在一个进程中运行的单个线程,这意味着当事件发生时,可以不用中断就运行事件处理程序,这样做有以下两个特点:

在任一给定时刻,最多运行一个事件处理程序。

事件处理程序可以不间断地运行直到结束。

这使得程序员能放宽同步要求,并且不必担心执行并发线程会改变共享内存的状态。

众所周知的秘密

在相当一段时间内,系统编程领域已经知道事件驱动编程是创建处理众多并发连接的服务的最佳方法。众所周知,由于不用保存很多上下文,因此节省了大量内存;又因为也没有那么多上下文切换,又节省了大量执行时间。

常用的server端linux高并发编程模型

Nginx Vs Apache

大名鼎鼎的Nginx使用了多进程模型,主进程启动时初始化,bind,监听一组sockets,然后fork一堆child processes(workers),workers共享socket descriptor。workers竞争accept_mutex,获胜的worker通过IO multiplex(select/poll/epoll/kqueue/...)来处理成千上万的并发请求。为了获得高性能,Nginx还大量使用了异步,事件驱动,non-blocking IO等技术。"What resulted is a modular, event-driven, asynchronous, single-threaded, non-blocking architecture which became the foundation of nginx code."

python之协程与IO操作Nginx 架构

对比着看一下Apache的两种常用运行模式,详见 Apache Modules

1. Apache MPM prefork模式

主进程通过进程池维护一定数量(可配置)的worker进程,每个worker进程负责一个connection。worker进程之间通过竞争mpm-accept mutex实现并发和链接处理隔离。 由于进程内存开销和切换开销,该模式相对来说是比较低效的并发。

python之协程与IO操作

2. Apache MPM worker模式

由于进程开销较大,MPM worker模式做了改进,处理每个connection的实体改为thread。主进程启动可配数量的子进程,每个进程启动可配数量的server threads和listen thread。listen threads通过竞争mpm-accept mutex获取到新进的connection request通过queue传递给自己进程所在的server threads处理。由于调度的实体变成了开销较小的thread,worker模式相对prefork具有更好的并发性能。

小结两种webserver,可以发现Nginx使用了更高效的编程模型,worker进程一般跟CPU的core数量相当,每个worker驻留在一个core上,合理编程可以做到最小程度的进程切换,而且内存的使用也比较经济,基本上没有浪费在进程状态的存储上。而Apache的模式是每个connection对应一个进程/线程,进程/线程间的切换开销,大量进程/线程的内存开销,cache miss的概率增大,都限制了系统所能支持的并发数。

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:
1. 保存处理机上下文,包括程序计数器和其他寄存器。
2. 更新PCB信息。
3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
4. 选择另一个进程执行,并更新其PCB。
5. 更新内存管理的数据结构。
6. 恢复处理机上下文。

注:总而言之就是很耗资源,具体的可以参考这篇文章:进程切换

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O
机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache
)中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

IO策略

由于IO的处理速度要远远低于CPU的速度,运行在CPU上的程序不得不考虑IO在准备暑假的过程中该干点什么,让出CPU给别人还是自己去干点别的有意义的事情,这就涉及到了采用什么样的IO策略。一般IO策略的选用跟进程线程编程模型要同时考虑,两者是有联系的。

Linux IO模型

网络IO的本质是socket的读取,socket在linux系统被抽象为流,IO可以理解为对流的操作。刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:

  1. 第一阶段:等待数据准备 (Waiting for the data to be ready)。

  2. 第二阶段:将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)。

对于socket流而言,

  1. 第一步:通常涉及等待网络上的数据分组到达,然后被复制到内核的某个缓冲区。

  2. 第二步:把数据从内核缓冲区复制到应用进程缓冲区。

网络应用需要处理的无非就是两大类问题,网络IO,数据计算。相对于后者,网络IO的延迟,给应用带来的性能瓶颈大于后者。

IO介绍

IO在计算机中指Input/Output,也就是输入和输出。由于程序和运行时数据是在内存中驻留,由CPU这个超快的计算核心来执行,涉及到数据交换的地方,通常是磁盘、网络等,就需要IO接口。

比如你打开浏览器,访问新浪首页,浏览器这个程序就需要通过网络IO获取新浪的网页。浏览器首先会发送数据给新浪服务器,告诉它我想要首页的HTML,这个动作是往外发数据,叫Output,随后新浪服务器把网页发过来,这个动作是从外面接收数据,叫Input。所以,通常,程序完成IO操作会有Input和Output两个数据流。当然也有只用一个的情况,比如,从磁盘读取文件到内存,就只有Input操作,反过来,把数据写到磁盘文件里,就只是一个Output操作。

IO编程中,Stream(流)是一个很重要的概念,可以把流想象成一个水管,数据就是水管里的水,但是只能单向流动。Input Stream就是数据从外面(磁盘、网络)流进内存,Output Stream就是数据从内存流到外面去。对于浏览网页来说,浏览器和新浪服务器之间至少需要建立两根水管,才可以既能发数据,又能收数据。

由于CPU和内存的速度远远高于外设的速度,所以,在IO编程中,就存在速度严重不匹配的问题。举个例子来说,比如要把100M的数据写入磁盘,CPU输出100M的数据只需要0.01秒,可是磁盘要接收这100M数据可能需要10秒,怎么办呢?有两种办法:

第一种是CPU等着,也就是程序暂停执行后续代码,等100M的数据在10秒后写入磁盘,再接着往下执行,这种模式称为同步IO;

另一种方法是CPU不等待,只是告诉磁盘,“您老慢慢写,不着急,我接着干别的事去了”,于是,后续代码可以立刻接着执行,这种模式称为异步IO。

同步和异步的区别就在于是否等待IO执行的结果。好比你去麦当劳点餐,你说“来个汉堡”,服务员告诉你,对不起,汉堡要现做,需要等5分钟,于是你站在收银台前面等了5分钟,拿到汉堡再去逛商场,这是同步IO。

你说“来个汉堡”,服务员告诉你,汉堡需要等5分钟,你可以先去逛商场,等做好了,我们再通知你,这样你可以立刻去干别的事情(逛商场),这是异步IO。

很明显,使用异步IO来编写程序性能会远远高于同步IO,但是异步IO的缺点是编程模型复杂。想想看,你得知道什么时候通知你“汉堡做好了”,而通知你的方法也各不相同。如果是服务员跑过来找到你,这是回调模式,如果服务员发短信通知你,你就得不停地检查手机,这是轮询模式。总之,异步IO的复杂度远远高于同步IO。

操作IO的能力都是由操作系统提供的,每一种编程语言都会把操作系统提供的低级C接口封装起来方便使用,Python也不例外。我们后面会详细讨论Python的IO编程接口。

接触网络编程,我们时常会与各种与IO相关的概念打交道:同步(Synchronous)、异步(ASynchronous)、阻塞(blocking)和非阻塞(non-blocking)

同步与异步的主要区别就在于:会不会导致请求进程(或线程)阻塞。同步会使请求进程(或线程)阻塞而异步不会。

linux下有五种常见的IO模型,其中只有一种异步模型,其余皆为同步模型。如图:

python之协程与IO操作

同步:
      所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。也就是必须一件一件事做,等前一件做完了才能做下一件事。

例如普通B/S模式(同步):提交请求->等待服务器处理->处理完毕返回 这个期间客户端浏览器不能干任何事

异步:
      异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

例如 ajax请求(异步): 请求通过事件触发->服务器处理(这是浏览器仍然可以作其他事情)->处理完毕

这就是同步和异步。举个简单的例子,假如有一个任务包括两个子任务A和B,对于同步来说,当A在执行的过程中,B只有等待,直至A执行完毕,B才能执行;而对于异步就是A和B可以并发地执行,B不必等待A执行完毕之后再执行,这样就不会由于A的执行导致整个任务的暂时等待。

阻塞
     阻塞调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,cpu不会给线程分配时间片,即线程暂停运行)。函数只有在得到结果之后才会返回。

有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。 例如,我们在socket中调用recv函数,如果缓冲区中没有数据,这个函数就会一直等待,直到有数据才返回。而此时,当前线程还会继续处理各种各样的消息。

非阻塞
      非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。
对象的阻塞模式和阻塞函数调用
对象是否处于阻塞模式和函数是不是阻塞调用有很强的相关性,但是并不是一一对应的。阻塞对象上可以有非阻塞的调用方式,我们可以通过一定的API去轮询状 态,在适当的时候调用阻塞函数,就可以避免阻塞。而对于非阻塞对象,调用特殊的函数也可以进入阻塞调用。函数select就是这样的一个例子。

这就是阻塞和非阻塞的区别。也就是说阻塞和非阻塞的区别关键在于当发出请求一个操作时,如果条件不满足,是会一直等待还是返回一个标志信息。

1. 同步,就是我调用一个功能,该功能没有结束前,我死等结果。
2. 异步,就是我调用一个功能,不需要知道该功能结果,该功能有结果后通知我(回调通知)
3. 阻塞,      就是调用我(函数),我(函数)没有接收完数据或者没有得到结果之前,我不会返回。
4. 非阻塞,  就是调用我(函数),我(函数)立即返回,通过select通知调用者

同步IO和异步IO的区别就在于:数据拷贝的时候进程是否阻塞!

阻塞IO和非阻塞IO的区别就在于:应用程序的调用是否立即返回!

对于举个简单c/s 模式:

同步:提交请求->等待服务器处理->处理完毕返回这个期间客户端浏览器不能干任何事
异步:请求通过事件触发->服务器处理(这是浏览器仍然可以作其他事情)->处理完毕
同步和异步都只针对于本机SOCKET而言的。

同步和异步,阻塞和非阻塞,有些混用,其实它们完全不是一回事,而且它们修饰的对象也不相同。
阻塞和非阻塞是指当进程访问的数据如果尚未就绪,进程是否需要等待,简单说这相当于函数内部的实现区别,也就是未就绪时是直接返回还是等待就绪;

而同步和异步是指访问数据的机制,同步一般指主动请求并等待I/O操作完毕的方式,当数据就绪后在读写的时候必须阻塞(区别就绪与读写二个阶段,同步的读写必须阻塞),异步则指主动请求数据后便可以继续处理其它任务,随后等待I/O,操作完毕的通知,这可以使进程在数据读写时也不阻塞。(等待"通知")

网络IO的模型大致有如下几种:

1)阻塞I/O(blocking I/O)
2)非阻塞I/O (nonblocking I/O)
3) I/O复用(select 和poll) (I/O multiplexing)
4)信号驱动I/O (signal driven I/O (SIGIO))
5)异步I/O (asynchronous I/O (the POSIX aio_functions))

前四种都是同步,只有最后一种才是异步IO。

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

在深入介绍Linux IO各种模型之前,让我们先来探索一下基本 Linux IO 模型的简单矩阵。如下图所示:

python之协程与IO操作

每个 IO 模型都有自己的使用模式,它们对于特定的应用程序都有自己的优点。本节将简要对其一一进行介绍。常见的IO模型有阻塞、非阻塞、IO多路复用,异步。

I/O模型

阻塞式I/O

非阻塞式I/O

I/O复用;

信号驱动式I/O

异步I/O

一个输入操作通常包括两个不同的阶段:

1) 等待数据准备好;

2) 从内核向进程复制数据;

对于一个套接字上的输入操作,第一步通常涉及等待数据从网络中到达。当所等待分组到达时,它被复制到内核中的某个缓冲区。第二步就是把数据从内核缓冲区复制到应用进程缓冲区。

网络IO操作实际过程涉及到内核和调用这个IO操作的进程。以read为例,read的具体操作分为以下两个部分:

(1)内核等待数据可读

(2)将内核读到的数据拷贝到进程

阻塞I/O模型:

简介:进程会一直阻塞,直到数据拷贝完成

应用程序调用一个IO函数,导致应用程序阻塞,等待数据准备好。 如果数据没有准备好,一直等待….数据准备好了,从内核拷贝到用户空间,IO函数返回成功指示。

最流行的I/O模型是阻塞式I/O(blocking I/O)模型,默认情况下,所有套接字都是阻塞的。以数据报套接字作为例子,我们有如图6-1所示的情形。

阻塞I/O模型图:在调用recv()/recvfrom()函数时,发生在内核中等待数据和复制数据的过程。

python之协程与IO操作

同步阻塞IO

python之协程与IO操作

当调用recv()函数时,系统首先查是否有准备好的数据。如果数据没有准备好,那么系统就处于等待状态。当数据准备好后,将数据从系统缓冲区复制到用户空间,然后该函数返回。在套接应用程序中,当调用recv()函数时,未必用户空间就已经存在数据,那么此时recv()函数就会处于等待状态。

当使用socket()函数和WSASocket()函数创建套接字时,默认的套接字都是阻塞的。这意味着当调用Windows Sockets API不能立即完成时,线程处于等待状态,直到操作完成。

并不是所有Windows Sockets API以阻塞套接字为参数调用都会发生阻塞。例如,以阻塞模式的套接字为参数调用bind()、listen()函数时,函数会立即返回。将可能阻塞套接字的Windows Sockets API调用分为以下四种:

1.输入操作: recv()、recvfrom()、WSARecv()和WSARecvfrom()函数。以阻塞套接字为参数调用该函数接收数据。如果此时套接字缓冲区内没有数据可读,则调用线程在数据到来前一直睡眠。

2.输出操作: send()、sendto()、WSASend()和WSASendto()函数。以阻塞套接字为参数调用该函数发送数据。如果套接字缓冲区没有可用空间,线程会一直睡眠,直到有空间。

3.接受连接:accept()和WSAAcept()函数。以阻塞套接字为参数调用该函数,等待接受对方的连接请求。如果此时没有连接请求,线程就会进入睡眠状态。

4.外出连接:connect()和WSAConnect()函数。对于TCP连接,客户端以阻塞套接字为参数,调用该函数向服务器发起连接。该函数在收到服务器的应答前,不会返回。这意味着TCP连接总会等待至少到服务器的一次往返时间。

  使用阻塞模式的套接字,开发网络程序比较简单,容易实现。当希望能够立即发送和接收数据,且处理的套接字数量比较少的情况下,使用阻塞模式来开发网络程序比较合适。

阻塞模式套接字的不足表现为,在大量建立好的套接字线程之间进行通信时比较困难。当使用“生产者-消费者”模型开发网络程序时,为每个套接字都分别分配一个读线程、一个处理数据线程和一个用于同步的事件,那么这样无疑加大系统的开销。其最大的缺点是当希望同时处理大量套接字时,将无从下手,其扩展性很差

非阻塞IO模型

       简介:非阻塞IO通过进程反复调用IO函数(多次系统调用,并马上返回);在数据拷贝的过程中,进程是阻塞的;

进程把一个套接字设置成非阻塞是在通知内核:当所请求的I/O操作非得把本进程投入睡眠才能完成时,不要把本进程投入睡眠,而是返回一个错误.

我们把一个SOCKET接口设置为非阻塞就是告诉内核,当所请求的I/O操作无法完成时,不要将进程睡眠,而是返回一个错误。这样我们的I/O操作函数将不断的测试数据是否已经准备好,如果没有准备好,继续测试,直到数据准备好为止。在这个不断测试的过程中,会大量的占用CPU的时间。

把SOCKET设置为非阻塞模式,即通知系统内核:在调用Windows Sockets API时,不要让线程睡眠,而应该让函数立即返回。在返回时,该函数返回一个错误代码。图所示,一个非阻塞模式套接字多次调用recv()函数的过程。前三次调用recv()函数时,内核数据还没有准备好。因此,该函数立即返回WSAEWOULDBLOCK错误代码。第四次调用recv()函数时,数据已经准备好,被复制到应用程序的缓冲区中,recv()函数返回成功指示,应用程序开始处理数据。

python之协程与IO操作

python之协程与IO操作

前三次调用recvfrom时没有数据可返回,因此内核转而立即返回一个EWOULDBLOCK错误。第四次调用recvfrom时已有一个数据报准备好,它被复制到应用进程缓冲区,于是recvfrom成功返回。我们接着处理数据。

当一个应用进程像这样对一个非阻塞描述符循环调用recvfrom时,我们称之为轮询(polling)。应用进程只需轮询内核,以查看某个操作是否就绪。这么做往往耗费大量CPU时间。

当使用socket()函数和WSASocket()函数创建套接字时,默认都是阻塞的。在创建套接字之后,通过调用ioctlsocket()函数,将该套接字设置为非阻塞模式。Linux下的函数是:fcntl().
   
套接字设置为非阻塞模式后,在调用Windows Sockets
API函数时,调用函数会立即返回。大多数情况下,这些函数调用都会调用“失败”,并返回WSAEWOULDBLOCK错误代码。说明请求的操作在调用期间内没有时间完成。通常,应用程序需要重复调用该函数,直到获得成功返回代码。

需要说明的是并非所有的Windows Sockets
API在非阻塞模式下调用,都会返回WSAEWOULDBLOCK错误。例如,以非阻塞模式的套接字为参数调用bind()函数时,就不会返回该错误代码。当然,在调用WSAStartup()函数时更不会返回该错误代码,因为该函数是应用程序第一调用的函数,当然不会返回这样的错误代码。

要将套接字设置为非阻塞模式,除了使用ioctlsocket()函数之外,还可以使用WSAAsyncselect()和WSAEventselect()函数。当调用该函数时,套接字会自动地设置为非阻塞方式。

  由于使用非阻塞套接字在调用函数时,会经常返回WSAEWOULDBLOCK错误。所以在任何时候,都应仔细检查返回代码并作好对“失败”的准备。应用程序连续不断地调用这个函数,直到它返回成功指示为止。上面的程序清单中,在While循环体内不断地调用recv()函数,以读入1024个字节的数据。这种做法很浪费系统资源。

要完成这样的操作,有人使用MSG_PEEK标志调用recv()函数查看缓冲区中是否有数据可读。同样,这种方法也不好。因为该做法对系统造成的开销是很大的,并且应用程序至少要调用recv()函数两次,才能实际地读入数据。较好的做法是,使用套接字的“I/O模型”来判断非阻塞套接字是否可读可写。

非阻塞模式套接字与阻塞模式套接字相比,不容易使用。使用非阻塞模式套接字,需要编写更多的代码,以便在每个Windows Sockets API函数调用中,对收到的WSAEWOULDBLOCK错误进行处理。因此,非阻塞套接字便显得有些难于使用。

但是,非阻塞套接字在控制建立的多个连接,在数据的收发量不均,时间不定时,明显具有优势。这种套接字在使用上存在一定难度,但只要排除了这些困难,它在功能上还是非常强大的。通常情况下,可考虑使用套接字的“I/O模型”,它有助于应用程序通过异步方式,同时对一个或多个套接字的通信加以管理。

IO复用模型:

简介:主要是select和epoll;对一个IO端口,两次调用,两次返回,比阻塞IO并没有什么优越性;关键是能实现同时对多个IO端口进行监听;

I/O复用模型会用到select、poll、epoll函数,这几个函数也会使进程阻塞,但是和阻塞I/O所不同的的,这两个函数可以同时阻塞多个I/O操作。而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。

python之协程与IO操作

我们阻塞于select调用,等待数据报套接字变为可读。当select返回套接字可读这一条件时,我们调用recvfrom把所读数据报复制到应用进程缓冲区。

python之协程与IO操作

比较图6-3和图6-1,I/O复用并不显得有什么优势,事实上由于使用select需要两个而不是单个系统调用,I/O复用还稍有劣势。使用select的优势在于我们可以等待多个描述符就绪。

与I/O复用密切相关的另一种I/O模型是在多线程中使用阻塞式I/O(我们经常这么干)。这种模型与上述模型极为相似,但它并没有使用select阻塞在多个文件描述符上,而是使用多个线程(每个文件描述符一个线程),这样每个线程都可以*的调用recvfrom之类的阻塞式I/O系统调用了。

信号驱动IO

简介:两次调用,两次返回;

我们也可以用信号,让内核在描述符就绪时发送SIGIO信号通知我们。我们称这种模型为信号驱动式I/O(signal-driven I/O),图6-4是它的概要展示。

首先我们允许套接口进行信号驱动I/O,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个SIGIO信号,可以在信号处理函数中调用I/O操作函数处理数据。

python之协程与IO操作

我们首先开启套接字的信号驱动式I/O功能,并通过sigaction系统调用安装一个信号处理函数。改系统调用将立即返回,我们的进程继续工作,也就是说他没有被阻塞。当数据报准备好读取时,内核就为该进程产生一个SIGIO信号。我们随后就可以在信号处理函数中调用recvfrom读取数据报,并通知主循环数据已经准备好待处理,也可以立即通知主循环,让它读取数据报。

无论如何处理SIGIO信号,这种模型的优势在于等待数据报到达期间进程不被阻塞。主循环可以继续执行,只要等到来自信号处理函数的通知:既可以是数据已准备好被处理,也可以是数据报已准备好被读取。

异步IO模型

简介:数据拷贝的时候进程无需阻塞。

  异步I/O(asynchronous I/O)由POSIX规范定义。演变成当前POSIX规范的各种早起标准所定义的实时函数中存在的差异已经取得一致。一般地说,这些函数的工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到我们自己的缓冲区)完成后通知我们。这种模型与前一节介绍的信号驱动模型的主要区别在于:信号驱动式I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。图6-5给出了一个例子。

     当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者的输入输出操作

python之协程与IO操作

异步非阻塞IO

python之协程与IO操作

对比同步非阻塞IO,异步非阻塞IO也有个名字--Proactor。这种策略是真正的异步,使用注册callback/hook函数来实现异步。程序注册自己感兴趣的socket 事件时,同时将处理各种事件的handler也就是对应的函数也注册给内核,不会有任何阻塞式调用。事件发生后内核之间调用对应的handler完成处理。这里暂且理解为内核做了event的调度和handler调用,具体到底是异步IO库如何做的,如何跟内核通信的,后续继续研究。

同步IO引起进程阻塞,直至IO操作完成。
异步IO不会引起进程阻塞。
IO复用是先通过select调用阻塞。

我们调用aio_read函数(POSIX异步I/O函数以aio_或lio_开头),给内核传递描述符、缓冲区指针、缓冲区大小(与read相同的三个参数)和文件偏移(与lseek类似),并告诉内核当整个操作完成时如何通知我们。该系统调用立即返回,并且在等待I/O完成期间,我们的进程不被阻塞。本例子中我们假设要求内核在操作完成时产生某个信号。改信号直到数据已复制到应用进程缓冲区才产生,这一点不同于信号驱动I/O模型。

各种I/O模型的比较

图6-6对比了上述5中不同的I/O模型。可以看出,前4中模型的主要区别在于第一阶段,因为他们的第二阶段是一样的:在数据从内核复制到调用者的缓冲区期间,进程阻塞于recvfrom调用。相反,异步I/O模型在这两个阶段都要处理,从而不同于其他4中模型。

python之协程与IO操作

同步I/O和异步I/O对比

POSIX把这两个术语定于如下:

同步I/O操作(sysnchronous I/O opetation)导致请求进程阻塞,直到I/O操作完成;

异步I/O操作(asynchronous I/O opetation)不导致请求进程阻塞。

根据上述定义,我们的前4种模型----阻塞式I/O模型、非阻塞式I/O模型、I/O复用模型和信号驱动I/O模型都是同步I/O模型,因为其中真正的I/O操作(recvfrom)将阻塞进程。只有异步I/O模型与POSIX定义的异步I/O相匹配。

 IO多路复用之select、poll、epoll详解

IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合:

  1. 当客户处理多个描述符时(一般是交互式输入和网络套接口),必须使用I/O复用。

  2. 当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。

  3. 如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。

  4. 如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。

  5. 如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。

与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

目前支持I/O多路复用的系统调用有 select,pselect,poll,epoll,I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作但select,pselect,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

对于IO多路复用机制不理解的同学,可以先行参考《聊聊Linux 五种IO模型》,来了解Linux五种IO模型。

1 select、poll、epoll简介

epoll跟select都能提供多路I/O复用的解决方案。在现在的Linux内核里有都能够支持,其中epoll是Linux所特有,而select则应该是POSIX所规定,一般操作系统均有实现。

1.1 select

基本原理:

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。

基本概念

  IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合:

  (1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。

  (2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。

  (3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。

  (4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。

  (5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。

  与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

select的调用过程如下所示:

python之协程与IO操作

(1)使用copy_from_user从用户空间拷贝fd_set到内核空间

(2)注册回调函数__pollwait

(3)遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)

(4)以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。

(5)__pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。

(6)poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。

(7)如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。

(8)把fd_set从内核空间拷贝到用户空间。

基本流程,如图所示:

python之协程与IO操作
输入图片说明

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

python之协程与IO操作

调用select的函数为r, w, e = select.select(rlist, wlist, xlist[, timeout]),前三个参数都分别是三个列表,数组中的对象均为waitable object:均是整数的文件描述符(file descriptor)或者一个拥有返回文件描述符方法fileno()的对象;

  • rlist: 等待读就绪的list
  • wlist: 等待写就绪的list
  • errlist: 等待“异常”的list
select方法用来监视文件描述符,如果文件描述符发生变化,则获取该描述符。
1、这三个list可以是一个空的list,但是接收3个空的list是依赖于系统的(在Linux上是可以接受的,但是在window上是不可以的)。
2、当 rlist 序列中的描述符发生可读时(accetp和read),则获取发生变化的描述符并添加到 r 序列中
3、当 wlist 序列中含有描述符时,则将该序列中所有的描述符添加到 w 序列中
4、当 errlist序列中的句柄发生错误时,则将该发生错误的句柄添加到 e序列中
5、当 超时时间 未设置,则select会一直阻塞,直到监听的描述符发生变化
   当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的描述符(fd)有变化,则直接执行。

6、在list中可以接受Ptython的的file对象(比如sys.stdin,或者会被open()os.open()返回的object),socket object将会返回socket.socket()。也可以自定义类,只要有一个合适的fileno()的方法(需要真实返回一个文件描述符,而不是一个随机的整数)。

select代码注释

import select
import socket
import sys
import queue # 创建一个TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
# 绑定socket到指定端口
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)
# 监听连接的地址
server.listen(5)
inputs = [server]
# Socket的读操作
outputs = []
# socket的写操作
message_queues = {}
while inputs:
# Wait for at least one of the sockets to be ready for processing
print( '\nwaiting for the next event')
readable, writable, exceptional = select.select(inputs, outputs, inputs)
# 监听句柄序列,如果某个发生变化,select的第一个rLest会拿到数据,output只要有数据wLest就能获取到,select的第三个参数inputs用来监测异常,并赋值给exceptional。
# 监听inputs,outputs,inputs 如果他们的值有变化,就将分别赋值给readable,writable,exceptional。
for s in readable:
# 遍历readable的值。
if s is server:
connection, client_address = s.accept()
# 如果s 是server,那么server socket将接收连接。
print('new connection from', client_address)
# 打印出连接客户端的地址。
connection.setblocking(False)
# 设置socket 为非阻塞模式。
inputs.append(connection)
# 因为有读操作发生,所以将此连接加入inputs
message_queues[connection] = queue.Queue()
# 为每个连接创建一个queue队列。使得每个连接接收到正确的数据。
else:
data = s.recv(1024)
# 如果s不是server,说明客户端连接来了,那么就接受客户端的数据。
if data:
# 如果接收到客户端的数据
print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
message_queues[s].put(data)
# 将收到的数据放入队列中
if s not in outputs:
outputs.append(s)
# 将socket客户端的连接加入select的output中,并且用来返回给客户端数据。
else:
print('closing', client_address, 'after reading no data')
# 如果没有收到客户端发来的空消息,则说明客户端已经断开连接。
if s in outputs:
outputs.remove(s)
# 既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
inputs.remove(s)
# inputs中也删除掉
s.close()
# 把这个连接关闭掉
del message_queues[s]
# 删除此客户端的消息队列 for s in writable:
# 遍历output的数据
try:
next_msg = message_queues[s].get_nowait()
except queue.Empty:
# 获取对应客户端消息队列中的数据,如果队列中的数据为空,从消息队列中移除此客户端连接。
print('output queue for', s.getpeername(), 'is empty')
outputs.remove(s)
else:
print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
s.send(next_msg)
# 如果消息队列有数据,则发送给客户端。
for s in exceptional:
# 处理 "exceptional conditions"
print('handling exceptional condition for', s.getpeername() )
inputs.remove(s)
# 取消对出现异常的客户端的监听
if s in outputs:
outputs.remove(s)
# 移除客户端的连接对象。
s.close()
# 关闭此socket连接
del message_queues[s]
# 删除此消息队列。 ''' 在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生, 轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。 epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统(文件系统一般用什么数据结构实现?B+树)。把原先的select/poll调用分成了3个部分: 1)调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源) 2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字 3)调用epoll_wait收集发生的事件的连接 '''

select本质上是通过设置或者检查存放fd标志位的数据结构来进行下一步处理。这样所带来的缺点是:

  1. select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024。

    一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。32位机默认是1024个。64位机默认是2048.

  2. 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低。

    当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。如果能给套接字注册某个回调函数,当他们活跃时,自动完成相关操作,那就避免了轮询,这正是epoll与kqueue做的。

  3. 需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。

select的几大缺点:

(1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大

(2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

(3)select支持的文件描述符数量太小了,默认是1024

poll

基本原理:

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。

它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一个缺点:

  1. 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。

  2. poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。

注意:

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

在Python中调用poll

  • select.poll(),返回一个poll的对象,支持注册和注销文件描述符。

  • poll.register(fd[, eventmask])注册一个文件描述符,注册后,可以通过poll()方法来检查是否有对应的I/O事件发生。fd可以是i 个整数,或者有返回整数的fileno()方法对象。如果File对象实现了fileno(),也可以当作参数使用。

  • eventmask是一个你想去检查的事件类型,它可以是常量POLLIN, POLLPRIPOLLOUT的组合。如果缺省,默认会去检查所有的3种事件类型。

事件常量 意义
POLLIN 有数据读取
POLLPRT 有数据紧急读取
POLLOUT 准备输出:输出不会阻塞
POLLERR 某些错误情况出现
POLLHUP 挂起
POLLNVAL 无效请求:描述无法打开
  • poll.modify(fd, eventmask) 修改一个已经存在的fd,和poll.register(fd, eventmask)有相同的作用。如果去尝试修改一个未经注册的fd,会引起一个errnoENOENTIOError
  • poll.unregister(fd)从poll对象中注销一个fd。尝试去注销一个未经注册的fd,会引起KeyError
  • poll.poll([timeout])去检测已经注册了的文件描述符。会返回一个可能为空的list,list中包含着(fd, event)这样的二元组。 fd是文件描述符, event是文件描述符对应的事件。如果返回的是一个空的list,则说明超时了且没有文件描述符有事件发生。timeout的单位是milliseconds,如果设置了timeout,系统将会等待对应的时间。如果timeout缺省或者是None,这个方法将会阻塞直到对应的poll对象有一个事件发生。

poll代码

#!/usr/bin/env python
#-*- coding:utf-8 -*- import socket
import select
import queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
server.bind(server_address)
server.listen(5)
print("服务器启动成功,监听IP:", server_address) message_queues = {}
# 超时,毫秒
timeout = 5000
# 监听哪些事件
READ_ONLY = (select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY | select.POLLOUT)
# 新建轮询事件对象
poller = select.poll()
# 注册本机监听socket到等待可读事件事件集合
poller.register(server, READ_ONLY)
# 文件描述符到socket映射
fd_to_socket = {server.fileno(): server,}
while True:
print("等待活动连接......")
# 轮询注册的事件集合
events = poller.poll(timeout)
if not events:
print("poll超时,无活动连接,重新poll......") continue
print("有", len(events), "个新事件,开始处理......") for fd, flag in events:
s = fd_to_socket[fd]
# 可读事件
if flag & (select.POLLIN | select.POLLPRI):
if s is server:
# 如果socket是监听的server代表有新连接
connection, client_address = s.accept()
print("新连接:", client_address)
connection.setblocking(False) fd_to_socket[connection.fileno()] = connection
# 加入到等待读事件集合
poller.register(connection, READ_ONLY)
message_queues[connection] = Queue.Queue()
else:
# 接收客户端发送的数据
data = s.recv(1024)
if data:
print("收到数据:", data, "客户端:", s.getpeername())
message_queues[s].put(data)
# 修改读取到消息的连接到等待写事件集合
poller.modify(s, READ_WRITE)
else:
# Close the connection
print("closing", s.getpeername()) # Stop listening for input on the connection
poller.unregister(s)
s.close()
del message_queues[s]
# 连接关闭事件
elif flag & select.POLLHUP:
print(" Closing ", s.getpeername(), "(HUP)") poller.unregister(s)
s.close()
# 可写事件
elif flag & select.POLLOUT:
try:
msg = message_queues[s].get_nowait()
except Queue.Empty:
print(s.getpeername(), " queue empty") poller.modify(s, READ_ONLY)
else:
print("发送数据:", data, "客户端:", s.getpeername()) s.send(msg)
# 异常事件
elif flag & select.POLLERR:
print(" exception on", s.getpeername()) poller.unregister(s)
s.close()
del message_queues[s]

1.3 epoll

epoll的原理及改进

在linux2.6(准确来说是2.5.44)由内核直接支持的方法。epoll解决了select和poll的缺点。

  • 对于第一个缺点,epoll的解决方法是每次注册新的事件到epoll中,会把所有的fd拷贝进内核,而不是在等待的时候重复拷贝,保证了每个fd在整个过程中只会拷贝1次。
  • 对于第二个缺点,epoll没有这个限制,它所支持的fd上限是最大可以打开文件的数目,具体数目可以cat /proc/sys/fs/file-max查看,一般来说这个数目和系统内存关系比较大。
  • 对于第三个缺点,epoll的解决方法不像select和poll每次对所有fd进行遍历轮询所有fd集合,而是在注册新的事件时,为每个fd指定一个回调函数,当设备就绪的时候,调用这个回调函数,这个回调函数就会把就绪的fd加入一个就绪表中。(所以epoll实际只需要遍历就绪表)。

epoll同时支持水平触发和边缘触发:

  • 水平触发(level-triggered):只要满足条件,就触发一个事件(只要有数据没有被获取,内核就不断通知你)。e.g:在水平触发模式下,重复调用epoll.poll()会重复通知关注的event,直到与该event有关的所有数据都已被处理。(select, poll是水平触发, epoll默认水平触发)
  • 边缘触发(edge-triggered):每当状态变化时,触发一个事件。e.g:在边沿触发模式中,epoll.poll()在读或者写event在socket上面发生后,将只会返回一次event。调用epoll.poll()的程序必须处理所有和这个event相关的数据,随后的epoll.poll()调用不会再有这个event的通知。

在Python中调用epoll

  • select.epoll([sizehint=-1])返回一个epoll对象。

  • eventmask

事件常量 意义
EPOLLIN 读就绪
EPOLLOUT 写就绪
EPOLLPRI 有数据紧急读取
EPOLLERR assoc. fd有错误情况发生
EPOLLHUP assoc. fd发生挂起
EPOLLRT 设置边缘触发(ET)(默认的是水平触发)
EPOLLONESHOT 设置为 one-short 行为,一个事件(event)被拉出后,对应的fd在内部被禁用
EPOLLRDNORM 和 EPOLLIN 相等
EPOLLRDBAND 优先读取的数据带(data band)
EPOLLWRNORM 和 EPOLLOUT 相等
EPOLLWRBAND 优先写的数据带(data band)
EPOLLMSG 忽视
  • epoll.close()关闭epoll对象的文件描述符。
  • epoll.fileno返回control fd的文件描述符number。
  • epoll.fromfd(fd)用给予的fd来创建一个epoll对象。
  • epoll.register(fd[, eventmask])在epoll对象中注册一个文件描述符。(如果文件描述符已经存在,将会引起一个IOError
  • epoll.modify(fd, eventmask)修改一个已经注册的文件描述符。
  • epoll.unregister(fd)注销一个文件描述符。
  • epoll.poll(timeout=-1[, maxevnets=-1])等待事件,timeout(float)的单位是秒(second)。

基本原理:

epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就绪态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。

epoll代码注释

import socket, select
# 'windows'下不支持'epoll' EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!' serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
# 建立socket连接。
serversocket.setblocking(0)
# 因为socket本身是阻塞的,setblocking(0)使得socket不阻塞 epoll = select.epoll()
# 创建一个eopll对象
epoll.register(serversocket.fileno(), select.EPOLLIN)
# 在服务器端socket上面注册对读event的关注,一个读event随时会触发服务器端socket去接收一个socket连接。 try:
connections = {}; requests = {}; responses = {}
# 生成3个字典,connection字典是存储文件描述符映射到他们相应的网络连接对象
while True:
events = epoll.poll(1)
# 查询epoll对象,看是否有任何关注的event被触发,参数‘1’表示,会等待一秒来看是否有event发生,如果有任何感兴趣的event发生在这次查询之前,这个查询就会带着这些event的列表立即返回
for fileno, event in events:
# event作为一个序列(fileno,event code)的元组返回,fileno是文件描述符的代名词,始终是一个整数。
if fileno == serversocket.fileno():
# 如果一个读event在服务器端socket发生,就会有一个新的socket连接可能被创建。
connection, address = serversocket.accept()
# 服务器端开始接收连接和客户端地址
connection.setblocking(0)
# 设置新的socket为非阻塞模式
epoll.register(connection.fileno(), select.EPOLLIN)
# 为新的socket注册对读(EPOLLIN)event的关注
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = response
elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)
# 如果发生一个读event,就读取从客户端发过来的数据。
if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
epoll.modify(fileno, select.EPOLLOUT)
# 一旦完成请求已经收到,就注销对读event的关注,注册对写(EPOLLOUT)event的关注,写event发生的时候,会回复数据给客户端。
print('-'*40 + '\n' + requests[fileno].decode()[:-2])
# 打印完整的请求,证明虽然与客户端的通信是交错进行的,但是数据可以作为一个整体来组装和处理。
elif event & select.EPOLLOUT:
# 如果一个写event在一个客户端socket上面发生,他会接受新的数据以便发送到客户端。
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
# 每次发送一部分响应数据,直到完整的响应数据都已经发送给操作系统等待传输给客户端。
epoll.modify(fileno, 0)
# 一旦完整的响应数据发送完成,就不再关注读或者写event。
connections[fileno].shutdown(socket.SHUT_RDWR)
# 如果一个连接显式关闭,那么socket shutdown是可选的,在这里这样使用,是为了让客户端首先关闭。
# shutdown调用会通知客户端socket没有更多的数据应该被发送或者接收,并会让功能正常的客户端关闭自己的socket连接。
elif event & select.EPOLLHUP:
# HUP挂起event表明客户端socket已经断开(即关闭),所以服务器端也需要关闭,没有必要注册对HUP event的关注,在socket上面,他们总是会被epoll对象注册。
epoll.unregister(fileno)
# 注销对此socket连接的关注。
connections[fileno].close()
# 关闭socket连接。
del connections[fileno]
finally:
epoll.unregister(serversocket.fileno())
# 去掉已经注册的文件句柄
epoll.close()
# 关闭epoll对象
serversocket.close()
# 关闭服务器连接
# 打开的socket连接不需要关闭,因为Python会在程序结束时关闭, 这里的显示关闭是个好的习惯。 ''' 首先我们来定义流的概念,一个流可以是文件,socket,pipe等等可以进行I/O操作的内核对象。 不管是文件,还是套接字,还是管道,我们都可以把他们看作流。 之后我们来讨论I/O的操作,通过read,我们可以从流中读入数据;通过write,我们可以往流写入数据。现在假定一个情形,
我们需要从流中读数据,但是流中还没有数据,(典型的例子为,客户端要从socket读如数据,但是服务器还没有把数据传回来),
这时候该怎么办? 阻塞:阻塞是个什么概念呢?比如某个时候你在等快递,但是你不知道快递什么时候过来,而且你没有别的事可以干(或者说接下来的事要等快递来了才能做);
那么你可以去睡觉了,因为你知道快递把货送来时一定会给你打个电话(假定一定能叫醒你)。 非阻塞忙轮询:接着上面等快递的例子,如果用忙轮询的方法,那么你需要知道快递员的手机号,然后每分钟给他挂个电话:“你到了没?” 很明显一般人不会用第二种做法,不仅显很无脑,浪费话费不说,还占用了快递员大量的时间。 大部分程序也不会用第二种做法,因为第一种方法经济而简单,经济是指消耗很少的CPU时间,如果线程睡眠了,就掉出了系统的调度队列,暂时不会去瓜分CPU宝贵的时间片了。 为了了解阻塞是如何进行的,我们来讨论缓冲区,以及内核缓冲区,最终把I/O事件解释清楚。缓冲区的引入是为了减少频繁I/O操作而引起频繁的系统调用(你知道它很慢的),
当你操作一个流时,更多的是以缓冲区为单位进行操作,这是相对于用户空间而言。对于内核来说,也需要缓冲区。 假设有一个管道,进程A为管道的写入方,B为管道的读出方。 假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,
这个事件姑且称之为“缓冲区非空”。 但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,
B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区满”。 假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满” 也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。 这四个情形涵盖了四个I/O事件,缓冲区满,缓冲区空,缓冲区非空,缓冲区非满(注都是说的内核缓冲区,且这四个术语都是我生造的,仅为解释其原理而造)。
这四个I/O事件是进行阻塞同步的根本。(如果不能理解“同步”是什么概念,请学习操作系统的锁,信号量,条件变量等任务同步方面的相关知识)。 然后我们来说说阻塞I/O的缺点。但是阻塞I/O模式下,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),
很不幸这两种方法效率都不高。 于是再来考虑非阻塞忙轮询的I/O方式,我们发现我们可以同时处理多个流了(把一个流从阻塞模式切换到非阻塞模式再此不予讨论): while true {
for i in stream[]; {
if i has data
read until unavailable
}
} 我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费CPU。
这里要补充一点,阻塞模式下,内核对于I/O事件的处理是阻塞或者唤醒,而非阻塞模式下则把I/O事件交给其他对象(后文介绍的select以及epoll)处理甚至直接忽略。 为了避免CPU空转,可以引进了一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理比较厉害,
可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流
(于是我们可以把“忙”字去掉了)。代码长这样: while true {
select(streams[])
for i in streams[] {
if i has data
read until unavailable
}
} 于是,如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流
(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。 但是使用select,我们有O(n)的无差别轮询复杂度,同时处理的流越多,没一次无差别轮询时间就越长。再次 说了这么多,终于能好好解释epoll了 epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll之会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的。
(复杂度降低到了O(1)) 在讨论epoll的实现细节之前,先把epoll的相关操作列出: epoll_create 创建一个epoll对象,一般epollfd = epoll_create() epoll_ctl (epoll_add/epoll_del的合体),往epoll对象中增加/删除某一个流的某一个事件 比如 epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//注册缓冲区非空事件,即有数据流入 epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//注册缓冲区非满事件,即流可以被写入 epoll_wait(epollfd,...)等待直到注册的事件发生 (注:当对一个非阻塞流的读写发生缓冲区满或缓冲区空,write/read会返回-1,并设置errno=EAGAIN。而epoll只关心缓冲区非满和缓冲区非空事件)。 一个epoll模式的代码大概的样子是:
while true {
active_stream[] = epoll_wait(epollfd)
for i in active_stream[] {
read or write till
}
} '''

epoll的优点:

  1. 没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)。

  2. 效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。

  3. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销

epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:

LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。

ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

  1. LT模式

    LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的

  2. ET模式

    ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)

    ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

  3. 在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。(此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。)

注意:

如果没有大量的idle-connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle-connection,就会发现epoll的效率大大高于select/poll。

2 select、poll、epoll区别

  1. 支持一个进程所能打开的最大连接数

    python之协程与IO操作
    输入图片说明
  2. FD剧增后带来的IO效率问题

    python之协程与IO操作
    输入图片说明
  3. 消息传递方式

    python之协程与IO操作
    输入图片说明

 相同点和不同点图:

python之协程与IO操作

综上,在选择select,poll,epoll时要根据具体的使用场合以及这三种方式的自身特点:

  1. 表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。

  2. select低效是因为每次它都需要轮询。但低效也是相对的,视情况而定,也可通过良好的设计改善。

select,poll,epoll都是IO多路复用的机制。I/O多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。关于这三种IO多路复用的用法,前面三篇总结写的很清楚,并用服务器回射echo程序进行了测试。

总结:

综上,在选择select,poll,epoll时要根据具体的使用场合以及这三种方式的自身特点。

1、表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。

2、select低效是因为每次它都需要轮询。但低效也是相对的,视情况而定,也可通过良好的设计改善

python之协程与IO操作

Linux并发网络编程模型

1  Apache 模型,简称 PPC ( Process Per Connection ,):为每个连接分配一个进程。主机分配给每个连接的时间和空间上代价较大,并且随着连接的增多,大量进程间切换开销也增长了。很难应对大量的客户并发连接。

2  TPC 模型( Thread Per Connection ):每个连接一个线程。和PCC类似。

3  select 模型:I/O多路复用技术。

.1 每个连接对应一个描述。select模型受限于 FD_SETSIZE即进程最大打开的描述符数linux2.6.35为1024,实际上linux每个进程所能打开描数字的个数仅受限于内存大小,然而在设计select的系统调用时,却是参考FD_SETSIZE的值。可通过重新编译内核更改此值,但不能根治此问题,对于百万级的用户连接请求  即便增加相应 进程数, 仍显得杯水车薪呀。

.2select每次都会扫描一个文件描述符的集合,这个集合的大小是作为select第一个参数传入的值。但是每个进程所能打开文件描述符若是增加了 ,扫描的效率也将减小。

.3内核到用户空间,采用内存复制传递文件描述上发生的信息。

4 poll 模型:I/O多路复用技术。poll模型将不会受限于FD_SETSIZE,因为内核所扫描的文件 描述符集合的大小是由用户指定的,即poll的第二个参数。但仍有扫描效率和内存拷贝问题。

5 pselect模型:I/O多路复用技术。同select。

6 epoll模型:

.1)无文件描述字大小限制仅与内存大小相关

.2)epoll返回时已经明确的知道哪个socket fd发生了什么事件,不用像select那样再一个个比对。

.3)内核到用户空间采用共享内存方式,传递消息。

FAQ

1、单个epoll并不能解决所有问题,特别是你的每个操作都比较费时的时候,因为epoll是串行处理的。 所以你有还是必要建立线程池来发挥更大的效能。

2、如果fd被注册到两个epoll中时,如果有时间发生则两个epoll都会触发事件。

3、如果注册到epoll中的fd被关闭,则其会自动被清除出epoll监听列表。
4、如果多个事件同时触发epoll,则多个事件会被联合在一起返回。
5、epoll_wait会一直监听epollhup事件发生,所以其不需要添加到events中。
6、为了避免大数据量io时,et模式下只处理一个fd,其他fd被饿死的情况发生。linux建议可以在fd联系到的结构中增加ready位,然后epoll_wait触发事件之后仅将其置位为ready模式,然后在下边轮询ready fd列表。