python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

时间:2023-03-08 20:38:43
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

Python中的进程与线程

学习知识,我们不但要知其然,还是知其所以然。你做到了你就比别人NB。 我们先了解一下什么是进程和线程。

进程与线程的历史

我们都知道计算机是由硬件和软件组成的。硬件中的CPU是计算机的核心,它承担计算机的所有任务。 操作系统是运行在硬件之上的软件,是计算机的管理者,它负责资源的管理和分配、任务的调度。 程序是运行在系统上的具有某种功能的软件,比如说浏览器,音乐播放器等。 每次执行程序的时候,都会完成一定的功能,比如说浏览器帮我们打开网页,为了保证其独立性,就需要一个专门的管理和控制执行程序的数据结构——进程控制块。 进程就是一个程序在一个数据集上的一次动态执行过程。 进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。

在早期的操作系统里,计算机只有一个核心,进程执行程序的最小单位,任务调度采用时间片轮转的抢占式方式进行进程调度。每个进程都有各自的一块独立的内存,保证进程彼此间的内存地址空间的隔离。 随着计算机技术的发展,进程出现了很多弊端,一是进程的创建、撤销和切换的开销比较大,二是由于对称多处理机(对称多处理机(SymmetricalMulti-Processing)又叫SMP,是指在一个计算机上汇集了一组处理器(多CPU),各CPU之间共享内存子系统以及总线结构)的出现,可以满足多个运行单位,而多进程并行开销过大。 这个时候就引入了线程的概念。 线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合 和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。 线程没有自己的系统资源,只拥有在运行时必不可少的资源。但线程可以与同属与同一进程的其他线程共享进程所拥有的其他资源。

进程与线程之间的关系

线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。线程可与属于同一进程的其它线程共享进程所拥有的全部资源,但是其本身基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器、一组寄存器和栈)。

重点:

如果本重点看不懂,请看下面的到等号分割 之间的内容。

 1 进程:

  优点: 同时利用多核cpu,能够同时进行多个操作

  缺点:耗费资源(每个进程内核都要为其开辟线性地址空间,概念清看下面的)

线程:

  优点:共享内存,I/O操作可以并发,比如爬虫

  缺点:抢占资源,内核会枷锁,容易造成死锁

2 子进程不是越多越好,跟cpu核数相近或处理速度够快,根据处理速度进行启动,太多的话,除了资源浪费,还有进程上下文切换,cpu的中断interrupt,如果涉及到io操作的话还有模式转换

3 子线程也不是越多越好,进程上下文切换,cpu的中断interrupt

  4  计算机最小的任务执行单元:线程。说的微线程:协程 后面有介绍,其实就是控制线程的执行位置

  5  I/O操作不占用cpu时间,而是发生用户内核两种模式转换

6  I/O密集型应用(cpu)   ======= 多线程

计算密集型应用(用cpu) =========多进程

7  python的进程上有个GIL 全局解释性锁,这个会造成,一个进程的多个线程,不能同时使用多个cpu,而是cpu每次只能选一个线程执行,因此,多线程在cpu执行的是无效的。但是在I/O操作的时候是可以同步的,比如time.sleep就是io 操作,多线程,可以同时等待

  主线程

    比如我们写的py文件,执行的时候,所有代码是如何向下执行呢?肯定有个主线程的。

    再我们创建多线程时候,这些线程都是子线程,那肯定有个主线程的。

进程 和 线程的概念

进程和程序关系
进程:程序实例 程序子集 有所谓生命周期,可以kill叼 比如你安装的word 是一个程序 ,你打开一个文档是一个进程,可以关掉。 进程要想完成并发执行的功能就要进程切换
进程切换 ,上下文切换,进程运行,说明在cpu的寄存器里面有数据了。假如5条数据现在有两条,就切换了,现在要保存现场,回来时候 要恢复现场。
如果机器上有几千个进程,会切换 上万个切换需要时间,进程切换时监控程序来完成的,也就是内核,消耗时间 正常程序执行空间是用户空间, 占用在内核,说明大量时间消耗到进程切换。不好。 进程和程序关系
首先我们知道cpu是执行二进制的指令的,而对应的就有两种编程:解释形语言  和 编译形语言 两种编程, c++就是编译成二进制一起执行的,所以c++是编译形   解释型:JavaScript等。 大家自行百度谷歌了解吧。

  所以cpu是执行指令流的。

进程: 由 父进程 fork 自身而来
ls 这个进程的父进程是 shell frok而来 父进程 启动子进程 ,子进程内存空间是在父进程空间,一旦外部加载数据会重新开辟一个内存空间   程序时由指令和数据组成。程序时位于硬盘上的,是死的,只有当内核创建数据结构 ,分配了数据资源,cpu资源,处于活动状态,才有真正的执行价值,才会被拿来一个个被运行。
程序内的指令能不能并行执行?即进程内的多条指令流。单核是不能了。双核的话,要是第一条运行到一半依靠第二个指令的结果,所以不能同时运行的。这是一个执行流的情况下。

cpu在执行程序时,什么叫进程?

    出现内核之前
        进程在发起之前,是一个程序,在磁盘上。比如我们ls 多个用户可以发起多个ls进程。不互相干扰,不会意识到彼此存在。进程是程序的副本。 
        程序本来在硬件运行。但是指令+数据在内存中。 所以cpu会执行一条拿一条,为什么要用内核呢?我们程序在cpu运行,再完之前,不会退出的, 我想让另一个程序在cpu运行,是不行的。
        比如第一个在执行时,可能会产生I/O,过程中比如需要打开一个文件,这个文件在磁盘上很大,而程序必须将这个文件内容加载进内存的这个程序的数据区域之中。硬盘速度很慢。我们cpu这个时间很快,就加载一下就休息一下。cpu就闲置了。
      我们为了解决多个问题,就在cpu执行多任务了,同时执行。内存中有多个程序了。那第一个加载数据了,第二个怎么执行呢?抢过cpu?就打架了,而内存也可能打架,这样就需要一个监控工具在上层。所以内核这个资源调度监控工具。 
        linux 本身就是抢占式多任务。
 出现内核之后
有内核这个监控调度工具:内核就负责这样几项: 以后这个程序再想执行,是直接向内核注册申请的,内核决定,你可以使用cpu,使用2ms,分配给你,不管2ms以后你有没有结束,都要交给另一个程序。这样轮询交替,让用户看起来是同时运行的。其实在同一时间只能有一个程序来占用一个cpu运行的。
        所以,进程的启动调度都是由内核完成的。
在没有 内核之前,程序运行在内存。是作业。
后来为甚么叫进程呢?
        内核必须把有限资源分配个各个贪婪的程序,会把cpu切割成片,时间片,我们cpu是按照时间流逝进行完成的。
        内存是拿空间用来存储。cpu是时间
内核也是程序,所以有内核空间,用户程序,在用户空间,用户程序的单独空间是可以伸缩的。都直接把程序的空间连续起来是有弊端的,伸缩,会导致很多内存碎片。以后再想找连续空间就没了
内存单位:页面
     我们大多数内存空间叫分页内存空间,跟我们磁盘一样,也是分块的。叫内存页面 而能存数据的叫内存页框 page frame
内存分页:
MMU:内存控制单元,其实是星型方式才具备的组件,尤其是x86架构。在arm上有没有不知道。主要是以x86架构的cpu说明的
mmu:memerry manage unit
mmu 出现的主要目的是为了实现内存分页 我们应该知道,一个程序被加载到内存,并不是所有指令在运行,比如ls /etc 跟ls -l /etc 这个ls 的执行代码都不一样。
所以我们的ls 程序不是所有指令都加载进去的。需要-l再把-l的指令加载进来。 数据量不同。存的页框不同。需要1个我们分1个,需要2个分2个页框,不够了,找其他页框,即便不连续也可以,最好连续。
只要我们内核追踪着这个进程知道这个内存页框就行。

问题:如果说进程,所使用内存空间不是连续的内存页框,进程一定以为我们内存是连续的内存空间,进程是不知道分配到其他页框了。 如何让内存识别不到呢?

        我们内核,都给进程虚拟出了一段假的内存空间,让进程看起来是连续的。
        一个进程最多能识别多大内存呢?
            32位  4G
            64位  4G^4G
        一个程序的4G内存空间,它的所有执行操作都是通过调用内核中的系统调用来完成的。
        后来编程的时候不是在cpu上使用指令集编程的。而是使用系统调用。
        所以进程是知道内核存在的,所以进程要跟内核和平共处。

线性地址空间:

        因此32位的这4g虚拟 空间,有1g分给内核,剩下的程序自己用。而实际上我们的物理内存只有512M。但是为了统一管理,为了使得进程能够以统一风格,任程序员编程,不管你是512M物理内存还是1g物理内存,都是虚拟出4g内存,1g分给内核,其他分给程序的。这段空间是假的,是内核虚拟给进程的。这段地址空间被称为线性地址空间,看上去连续的,其实不连续。
        我们的3g开始是程序的执行入口的,开始一段是记录文件的格式等内容,前面是为空的
对于一个进程来讲,
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
    真实内存比如512M ,但是会虚拟出 4G线性空间给每个进程,
    每个进程都会以为4G虚拟空间,但是 不会用4G,该用多少用多少。
怎么使用这4G空间呢?
    刚一直在讲,程序是由指令加数据组成的,这个空间内放着程序的指令和数据,除了内核空间外。光有指令和数据 还不够,实际上在物理内存中是可能不连续的页框,线性地址空间的对应指令指向物理内存的不连续的页框。这样单个进程看到的虚拟空间只是 自己单独存在的。各自不互相影响。

swap  内存的缺页异常  大的异常

这样总有一天内存耗尽了。就会将内存页面页框内的 数据转移到我们硬盘上,叫做swap交换分区。
用于临时存放内存中暂时用不上的内存页面。用的什么算法呢?lru算法,lru:最近最少使用的内存页面数据。
但是如果人家进程醒了,访问被拿走的数据,内核会阻止它,告诉它没有,把它的拿回来从swap内,再把别人的腾空放到swap内。这就是所谓的缺页异常。
我找的页面不在了。我的内容本来在内存中,但是内存中不在了。这种情况就会产生I/O。为什么产生I/O呢?因为内核要从磁盘上抢回数据,从swap内。与磁盘操作就产生I/O,这就是大的异常。

小的异常  内存映射,mmap

我们知道,程序在执行时,都是动态链接的方式链接到好多个库上的。如果程序运行要依赖这个库,就会把这个库加载到内存上来的,因为程序的某些功能要依赖这个库的执行来实现的。
但是这个库叫so ,共享对象,多个进程可以共同使用的。比如进程1 需要 lib1加载到内存的一个页面上。进程2启动也需要依赖lib1,库文件都是执行的代码,不会改变。
进程2 通知内核加载lib1,由于lib1已经由进程1加载到内存在,不会再次加载,但是进程2没有虚拟内存对应的页面,就会出现缺页异常,这时内核就会说,等一下,内存中有已经加载了lib2,安抚一下,
(当然不是其独享的。如果其他进程需要依赖某个库,就会映射到哪个进程的空间) 将加载的页面告诉进程2.
而lib2加载到内存的空间由两个进程共享,都以为是自己的。但是不会 随着一个 结束而释放,只有全部释放才 释放。而这段共享空间是,内存映射,mmap==memery map。
这段内存是共享内存空间,是映射到进程里面去的。

内存泄露:

对于linux用户空间中,init是所有进程的老祖宗。除了init之外,其他进程如果需要其他任务,就会创建子进程。子进程都是由其父进程通过向内核fork系统调用复制的,父进程通过自身fork一个一模一样的子进程除了。
由子进程完成特定任务,一旦子进程完成任务,就清理子进程。kill子进程,终止子进程 一旦父进程都over了,如果还有子进程。所有子进程都成孤儿了。所有进程的父进程是init。父进程在over之前,要给子进程找一个新爹,即监护进程。
如果没有给子进程找新爹,这个子进程就成孤儿了,所有的孤儿进程就会被回收了,无法被回收占用的内存就无法被回收了,无法被分配了。这样分配出去的内存无法回收旧造成内存泄露了。这就找程序员了。
程序问题。重启可以,内存重新分配。 因为我们无法追踪孤儿进程,无法kill。 linux用户空间中,init是所有进程的老祖宗。除了init之外,其他进程如果需要其他任务,就会创建子进程。子进程都是由其父进程通过向内核fork系统调用复制的,父进程通过自身fork一个一模一样的子进程除了

task_struct:

就算是父进程kill杀掉子进程,也会像内核去申请终止 才能终止。因此,在内核中为进程创建了信息库,户口簿,
记录了每一个进程的 进程号,进程名 进程父进程 进程使用的cpu时间累积 分配的内存也,父进程编号,占用了哪些cpu 等等。
监视无处不在。这些对于linux内核叫 task_struct,任务结构。每一个任务都有一个结构,所以每创建一个子进程,都会向内核注册一个任务,
内核创建一个任务结构,来保存任务的各相关属性信息,内核从而追踪他所使用的各相关资源了。

双向循环的链表

内核这种任务有n个,那内核是如何找到每一个呢?
这些数据结构,是c实现的。python简单的多。单c的效率高。
着每个结构对编程语言来讲是数据结构。描述 数据的属性信息。 进程号 id 父进程等属性信息。这种描述进程格式叫数据 结构。这种数据结构用c描述要复杂的多。
每个进程有一个任务结构,那我们内核怎么知道有多少个呢,如果方便的找到任意一个ne ?
而我们的linux 的内核也是程序,而程序也要 存储到内存当中。内核也要在cpu运行之后,才能存到内存中的。内核要自身追踪这些结构,是通个c语言的双向循环的链表 实现的。每个结构也有结构的编号。
第一个结构尾部记录第二个结构的内存地址编号。而第二个结构首部 存有第一个结构的地址编号。 没有进程查看工具之前,我们要查看系统有多少个进程,就遍历上图的列表计算一次。

进程属性在linux系统的存放位置:

内核不能随便访问。所以我们需要伪文件系统。所以每个进程的进程号在proc下。这些个进程号下存着各种属性数据。启动命令,那个cpu运行过,哪些内存。

进程管理工具实现:
我们所谓的进程查看工具,就是获取为文件系统的进程属性的。 内存中,有1g是我们内核空间的,用户空间存放我们要运行程序的指令,cpu执行指令的方式有三种,顺序、循环、选择这三种执行。 比如一个进程申请的4g空间,内核占1g,而程序指令在用户空间,cpu依次读取进行,当执行到一个点无法完成,就会生成一个子进程,由于同一时间1个cpu只能做1件事情,cpu是时间片切片机制,
这时父进程就会休眠。而子进程又会申请开辟一个4g的线性地址 空间,cpu又会继续执行子进程的指令,执行完后,结构返回给父进程。
即使有两个cpu 程序也不能即使执行。因为父进程等着子进程返回结构才能继续工作。

线程: thread

    程序时由指令和数据组成。程序时位于硬盘上的,是死的,只有当内核创建数据结构     ,分配了数据资源,cpu资源,处于活动状态,才有真正的执行价值,才会被拿来一个个被运行。
    程序内的指令能不能并行执行?即进程内的多条指令流。单核是不能了。双核的话,要是第一条运行到一半依靠第二个指令的结果,所以不能同时运行的。这是在一个执行流的情况下。
        那我们两个cpu干什么呢?可以执行多个进程啊,虽然不能执行一个进程的多个指令
分成n个执行流,每个流内都有指令和数据,是不互相干扰的。
这样就需要一个进程空间内部,并行执行。一个进程内部有多个执行流来并行操作,但是出现资源争抢线性
线程是一个进程内部的多个执行流
多个线程不能同时打开一个资源文件,而lib尽管可以共享,也是在内核级别存在的。
资源发送争抢的区域叫临界区。
单进程、单线程:一个进程内存空间内有一个执行流。
单进程、多线程:一个进程内的指令集分成多个执行流 不管你的一个进程空间内部有多少个执行流,说白了就是多少个线程,但是你有一个cpu,多个线程没有什么明显的地方,除了麻烦以外。
多个cpu就有好多好处了。 web服务器工作模型及短板。:
web服务器:web服务进程。有一个用户访问时,我不能直接访问,否则有问题,那第二个来访问怎么办,只能等着。
怎么办呢?给每个用户生成一个子进程,每个用户一个子进程,一个用户需要10M内存,1000个用户就是10g。还要涉及到进程切换。
但是如果访问主页,主页这个内存空间的话,每个用户这一个子进程就要开辟一块,那我们的服务器资源都要撑坏了。所以为了让众多用户共享一个资源来说,我们就要采用线程模型。
web服务比如启动一个进程,里面分配多个线程,这样我们就打开一份数据就行了。cpu多的话就好了。
但是多个I/O争用的。虽然我们cpu多个,里面处理数据很快,但是我们网卡就有一个,还是会堵在网卡队列这里。
所以我们要理解原理,我们就知道系统瓶颈短板在哪了。 对于linux 而言,不是线程操作系统。windows solorias 都是微内核的线程操作系统。linux是单内核的。 线程跟内核本身的关系并不大,是由线程库实现的。迄今为止,linux内核中维持的每一个线程,跟进程毫无区别。

LWP

就算你生成了n个线程,但是在 linux看来跟进程是一样的。只不过是占用内存大小变小,称为轻量级进程。LIGHT WEIGHT PROCESS
linux 原生内核不支持线程,好多爱好开发者提供了好多实现方式,linux内核实现一套   红帽  一套   其他一套  三种实现需要线程库的支持。

 线程死锁

1
2
3
4
5
线程不一定是好事,多个线程不能同时抢占一个文件。内核会给资源枷锁。
线程资源浪费:
    死锁:12的资源 21的资源,都在等着释放。
           1 等着2拿的资源,cpu过一下看一回,一直cpu看,浪费时间。
    liunx内部有自选锁。很复杂

计算机基础

库 和api
再在其上我们需要开发各中应用程序。但是这些应用程序可能需要一些公共功能,我们不能全部单独为其开发这些功能都重新开发一边,对于程序人员来说工作量大不说,整个系统软件本身体积也比较庞大。
因此我们把这些库抽取出来,做成了库,这些库表现出来的接口,我们成为api,应用二进制接口。 用户态 内核态
对于应用程序,我们有个独特的称呼,不管其有没有库,只要这些应用程序,不是作为操作系统本身一部分而存在的,我们就称为运行用户空间的程序,也叫用户态。
而需要在内核空间运行的程序,内核就是我们的操作系统,我们称为内核态,核心态,也叫内核空间。 在众多应用程序当中, 每一个程序要想运行起来,它最终是向内核发起系统调用来完成的,或者某些程序某一部分需要内核参与,某些不需要内核参与。 比如 计算1+1=2 这个需要在内核空间运行吗?这各指令
就像你在shell脚本中计算1+1=2这些计算指令,通常不会发起系统调用的。因为你不需要内核坐特权操作,不会发起特权指令。 我们知道cpu的架构大概分成了4各层次。环0 环1 环2 环3.
环0 是特权指令 :操作硬件 控制总线
没有做特权级操作则不会调用环0 模式转换 一旦发起系统调用,就是调用某个函数,如果调用是内核的某个函数功能,而不是用户空间的功能,这意味着cpu接下来将转为特权模式下执行。特权模式下执行的一定是内核的代码 。
这其实意味着,程序执行这需要某个特权操作,cpu需要转到内核模式的特权模式下,将指令执行的结果返回给程序,而这个程序继续操作向后进行,回到用户空间。所以有一部分操作,程序是无法自己完成,程序是没有权限操作的。
需要向内核发系统调用。到内核空间,执行特权操作 。然后将指令结构返回给程序,然后回到用户空间,程序继续执行。这样就是模式转换。 从用户模式到内核模式,从内核模式到用户模式。内核模式是用户模式需要完成的某些特权操作的模式。一个用户所需要的特定操作,通常情况下,一定是个应用程序在用户模式中所需要的指令来完成的。
所以我们说过,操作系统的所产生的生产力,通常是看,是否在用户模式空间占用的大量空间。所以程序的生产力是看是否在用户空间占用了大量的时间,而频繁切换或者在内核空间占用大量的时间就是非常浪费 的。 内核调度程序在cpu执行
所谓的一个程序的执行一定是内核调度程序到cpu上执行的。那我们那些程序能执行,那些程序不能执行呢? 人和操作系统接口 :shell
比如我们外存上也就是硬盘上上有三十个应用程序我们一开机有三十个程序,难道我们一开机都需要开启吗??
当然不是,一些需要系统必须的程序,我们可以开机自动运行,但是某些需要我们需要时运行,那我们怎么通知内核让内核调度运行起来呢?这样我们就需要一个人机接口shell,
这个人机接口,用户通过shell接口可以跟操作系统打交道。这个程序可以 程序请求提交给内核,进而内核给程序开放有赖于生存的基本条件,从而程序执行起来的。。所以说,没有shell的操作系统,程序可以运行起来吗?
可以,只不过我们无法通过接口跟系统通知启动程序了。 计算机基础 模式转换 系统调用
 MMU cpu的三个单元 寄存器 上下文 task_struck 线程 临界区 缓存命中
MMU
操作系统基本工作体系的是比价简单的:运算器控制器被独立到cpu上,有一个独立的芯片叫MMU :内存控制单元。
还有: 存储器 -- memeroy
显示设备---vga
键盘设备---kb
硬盘设备---由硬盘控制器或者适配器链接到主机上的
这些部件如何交互呢?需要线来链接起来 通过星型方式,有总部完成数据交互
早期是主线方式。早前不采用星型方式就是太麻烦,让计算机体系变得非常复杂。每个设备都要通过cpu延伸一根或者数跟链接线。所以这些设备都链接到同一根总线上。用来交互
既然是总线,在某一个时刻被某一个设备使用了,cpu跟内存交互,我们vga就无法工作了,其实总线的频率很高,一般总线频率不是瓶颈所在。 内存分页:
MMU:内存控制单元,其实是星型方式才具备的组件,尤其是x86架构。在arm上有没有不知道。主要是以x86架构的cpu说明的
mmu:memerry manage unit
mmu 出现的主要目的是为了实现内存分页 cpu的三个单元: 在这几个部件中cpu是至关重要的。主要工作:在内存中取出指令并运行,在每个cpu运行时钟周期内,取出指令,解码指令,确定其类型和操作数并执行。cpu主要功能就是从内存中装载指令或者数据。必须完成解码。
cpu必须要有不同的组件完成这三个动作:取出指令 解码指令 执行指令
第一个组件:取指单元
第二个 解码单元
第三个 执行单元
寄存器:
任何指令的执行都需要三个时钟周期: 由于内存比较慢,第一个步骤:取出指令,每个cpu都有临时存储指令的设备。我们成为寄存器
指令计数器:
多个指令怎么排序计算先后,需要一个指令计数器,这样就可以将指令指向下一个指令在内存中的位置,这样就连起来
这样程序计数器,堆栈计数器,等组件都是跟cpu的频率是相同的,在同一个时钟周期下
这些都称为cpu的寄存器。
上下文
cpu经常会终止挂起,切换执行另一个程序。这叫做上下文。
上下文切换
将某个程序终止挂起的时候,操作系统必须将程序的状态必须保存下来,因为这个程序执行到某条指令的寄存器,另一个进程也需要这个寄存器,但是当原来的程序切换会来的时候,原来的指令位置所在的寄存器的状态要保持原来样子。
我们将进程的指令指针计数器等等状态保存下来就称为保存现场。cpu重新装载这个进程回来的时候,就意味着保存的数据重新恢复。这就叫做上下文切换 上下文切换是需要成本的,保存现场和恢复现场是需要时间成本的。 这些状态数据是保存到内存当中的 在内核当中维护的一个任务结构task_struck的内存结构, 包括程序进程的id 父进程的id 状态 属性等。当发生进程切换的时候,内核就会将挂起的进程的各种状态保存到任务结构,
在内存当中,当需要恢复的时候,就重新读出来。 比如httpd的prefork模型,httpd进程,1000个链接进来,每一个请求,代表1000个进程,每一个用户相应的时候,要进行快速切换。这样切换过程中就会浪费大量上下文的切换的时间。
ngnix 不用切换,一个进程相应多个请求,所以省去了大量上下文切换的时间。
要完成这种操作,必需要实现将某个进程绑定到cpu上,用来实现cpu的 cpu affiliated ,跟cpu 和进程的调优操作,主要优化的主要目标之一是,cpu的 侵缘线 cpu affiliated cpu三个处理步骤:取指 解码 执行, 如果10个指令,就需要三十个步骤。
位置让这个更快呢?
每一个让一个组件执行?
取指单元取完第一个指令马上取第二个指令不等待后两个步骤执行完成,当解码单元执行完成后,取值单元的第二个指令的步骤也完成了,因为都是同样的时钟周期。再交给下一棒
五个周期完成所有操作。事实上更快。
事实上 cpu的取值单元 不止一个,解码单元也不止一个,因此会并行处理指令
cpu 处理工艺 保证18个月晶体管数量翻一番,interl 创始人摩尔保证。
目前工艺制造工艺无法超过4ghz,但是如何保证计算机处理能力18个月每隔就翻一番呢?
多核心,也叫超线程。也就是多线程 hyperthread。
多线程在根本上讲,其实是 一颗cpu上同时在两个线程之间切换,在cpu内部引入一个类似寄存器的设备。只不过是在同一时间一个cpu核心运行两个线程,所以叫超线程模式 对于cpu的核心,超线程,也就是cpu在一个芯片看到的就相当于两个线程芯片。比如i3 双核,在物理核心是2个,但是逻辑核心看到的就是4核心。
cpu在同一个时刻可以执行两个线程,一个大脑可以想两件事。 win 任务管理器看到的就是逻辑 发展到今天,cpu发展到多核心模型。cpu性能提升早起是工作频率,现在提升很困难,后来频率提升改为多核心。 因为在时钟周期频率分割很多。后来提升不上去了。就加芯片。并行执行。 一个进程在执行时候只能使用1个核心。
如何让其使用更快呢?我们知道系统中不止有1个进程,我们多个核心,就可以同时运行多个任务进程了。
线程 临界区
特殊场景:
假如,众多进程中,只有几个处于繁忙中,比如web服务器,1个web进程很忙,其他进程很闲。4个核心。这样其他的核心不能充分利用,那如何呢?并行编程。
如何让1个进程运行到多个cpu核心上呢?? 我们经指令流做成并行执行流,每个执行流就是一个线程,比如一个程序400个指令,100个指令为一个指令流,就可以分到4个核心上了。这就叫并行编程的多线程。
进程如果不做并发编程时候,进程之间是毫不相干的。并行的情况下,一个进程多个线程,的指令之间可能相互依赖,运行结果。
linux多个核心就有问题了,
线程就是进程并行执行的指令多个执行流
线程是共享进程打开的文件的。 如果是多个线程时候,第一个线程打开文件的时候,有可能第二个线程需要这个文件。我们称为资源争用频繁的地方较为临界区。
所以作为程序员,我们要精确使用计算,否则多线程,不但不能提升性能,反而发生在临界区争用,反而会导致程序性能更差。 而在web服务器中,一个进程内部生产n个线程,尤其是在读操作的时候,大多是get,每个进程相应一个请求,进程之间是不相干的。 比如读的时候,一个进程打开文件,第一个进程可以读,第二个照样可以读 缓存命中:
linux进程本来就很轻量级,而线程在内核看来照样是进程,所以调度也按照线程。但是多核心就会有问题, 比如 当一个进程在1号cpu核心执行时,
当内核分配给该进程在cpu运行的时间耗完了的时候,内核就将这个进程挂起,当再次恢复该进程时候,不是在1号cpu了,而是在2号cpu。这就造成了一个问题,就是缓存难以命中。
事实上,为了保证cpu核心被进程是负载均衡使用的,内核在管理cpu时,每个cpu都有执行队列,有两个,1:等待运行的队列 2 运行完就是过期队列
如果没运行完,当运行队列中的执行完后 ,就将过期队列跟运行队列反过来再运行一遍。 比如两个核心,同时两个核心都有100进程,在一个时钟周期内,但是下一个周期,1号还有98个,但是2号还有2个,内核就会rebanlance 重新分配,降低cpu负载,但是均衡结果是缓存无法命中了。
所以我们以后cpu使用优化,要进程绑定,目的是提高缓存命中率的。
但是1颗cpu太忙了,我们如和?多颗cpu即可
这种机制就叫做smp机制,对称处理器。
cpu管理时,要使用多颗,就要有多个插槽,我们称为cpu socket,比如1个插槽插1颗cpu,1颗cpu有两个核心。有4个插槽,主板就支持8核 MMU cpu的三个单元 寄存器 上下文 task_struck 线程 临界区 缓存命中

线程: thread

程序时由指令和数据组成。程序时位于硬盘上的,是死的,只有当内核创建数据结构     ,分配了数据资源,cpu资源,处于活动状态,才有真正的执行价值,才会被拿来一个个被运行。
程序内的指令能不能并行执行?即进程内的多条指令流。单核是不能了。双核的话,要是第一条运行到一半依靠第二个指令的结果,所以不能同时运行的。这是在一个执行流的情况下。
那我们两个cpu干什么呢?可以执行多个进程啊,虽然不能执行一个进程的多个指令
分成n个执行流,每个流内都有指令和数据,是不互相干扰的。
这样就需要一个进程空间内部,并行执行。一个进程内部有多个执行流来并行操作,但是出现资源争抢线性
线程是一个进程内部的多个执行流
多个线程不能同时打开一个资源文件,而lib尽管可以共享,也是在内核级别存在的。
资源发送争抢的区域叫临界区。
单进程、单线程:一个进程内存空间内有一个执行流。
单进程、多线程:一个进程内的指令集分成多个执行流

模式转换:内核模式  用户模式

为了避免用户的进程操作系统资源,cpu是有保护进制的。cpu把它能操作运行的指令时分4类。分别放在环0 1 2 3 上。
用户进程只要不是以内核方式开发的,是以自己编程方式开发的基本程序,是不能调用特权指令,一旦尝试调用cpu的特权指令,cpu就会唤醒内核的。而普通程序要想访问硬件,就要访问特定的程序接口访问特权指令,要通过系统调用通知内核。
库调用,系统调用。
库调用是执行库的代码的,而系统调用是执行内核代码的。
程序运行,到内核运行 在回到用户空间这就叫做模式转换。 所以产生系统调用在内核浪费的时间越多,cpu越浪费。所以大部分时间70%应该浪费到用户空间,才会产生生产力
比如我们的ftp服务器,只有在访问硬盘数据资源,网卡封包解包等才会执行内核模式操作。所以浪费在内核模式的程序大部分是系统程序。而用户工作程序,应该大部分时间需要浪费在用户空间。

进程切换:   上下文切换  进程的环境切换

cpu只有一颗,在同一时刻,cpu运行的进程只有一个,不能让一个进程一直占用cpu,我们一定要让其及时切换交给其他进程。
寄存器:保存当前进程的执行状态码
1级缓存 2级缓存 3级缓存 内存 硬盘 而进程切换,我们要把寄存器当前的执行状态码,保存下来。保存现场。
每个进程,内核都会在内存维护一个task_starck 任务结构,用于保存进程的属性状态信息。一旦切换进程,就会保存当前状态。再把另一个进程,本来保存在内存的任务结构的状态信息,从新交给cpu执行。在这个切换时间内,cpu并没有干活。所以这个切换时间越快越好。
那是越频繁越好呢?? 大量时间cpu都浪费到来回切换了。

时钟中断:

当然内核也有内核工作频率的,靠时钟驱动的。比如内核100hz/s   1s 震荡100次。内核工作hz 200 500 1000  s是越快越好吗?
每次HZ翻转时,时钟震荡翻转时,都会产生时钟中断发生1s,linux是抢占式多任务,如果中断就会发生抢占在时钟中断周期走完时。 cpu时间走完了,或者内核分配的时间走完了。 为什么会抢?
进程是有优先级的。
进程调度,是内核的核心功能之一。要公平合理。对于那些需要占用特权资源的。紧急的进程优先级高。 公平:
结果公平
七点公平 进程的调度算法非常重要:
统筹方法 程序=算法指令+数据 结构 优先级高的排前面,低的排后面。排1对合适吗?
我们按照优先级排队,优先级相同的排一队。在算法过滤 Big O :算法优先的判断工具
O(1)
横 队列长度 纵是扫描时间

=========开始正文 Python内的线程进程协程模块实现==== 

threading模块

threading 模块建立在 _thread 模块之上。thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。

创建线程的两种方式

1 第一种创建线程的方式 创建20个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import time
  
def worker(num):
    """
    thread worker function
    :return:
    """
    time.sleep(1)
    print("Thread %d" % num)
    return
  
for in range(20):
    = threading.Thread(target=worker,args=(i,),name=“t.%d” % i)
    t.start()

2 第二种创建线程的方式 创建20个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
class MyThread(threading.Thread):
    def __init__(self,name):
        # threading.Thread.__init__(self)
        super(MyThread,self).__init__(target=self.fun,name="t %d" %i)
        self.name = name
    def fun(self):
        time.sleep(2)
        print("name %s thread %s" % (self.name,threading.current_thread().name) )
for in range(20):
    = MyThread(i)
    t.start()

多线程的说明

threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。python当前版本的多线程库没有实现优先级、线程组,线程也不能被停止、暂停、恢复、中断。

threading模块提供的类:  
  Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。

threading 模块提供的常用方法: 
  threading.currentThread(): 返回当前的线程变量。 
  threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 
  threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

threading 模块提供的常量:

  threading.TIMEOUT_MAX 设置threading全局超时时间。

Thread是线程类,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖run():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# coding:utf-8
import threading
import time
#方法一:将要执行的方法作为参数传给Thread的构造方法
def action(arg):
    time.sleep(1)
    print 'the arg is:%s\r' %arg
for in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.start()
print 'main thread end!'
#方法二:从Thread继承,并重写run()
class MyThread(threading.Thread):
    def __init__(self,arg):
        super(MyThread, self).__init__()#注意:一定要显式的调用父类的初始化函数。
        self.arg=arg
    def run(self):#定义每个线程要运行的函数
        time.sleep(1)
        print 'the arg is:%s\r' % self.arg
for in xrange(4):
    t =MyThread(i)
    t.start()
print 'main thread end!'
创建线程的两种方法

关于python多线程编程中join()和setDaemon()的一点儿探究

 

关于python多线程编程中join()和setDaemon()的用法,这两天我看网上的资料看得头晕脑涨也没看懂,干脆就做一个实验来看看吧。

首先是编写实验的基础代码,创建一个名为MyThread的 类,然后通过向这个类传入print_func这个方法,分别创建了两个子线程:

python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
#!/usr/bin/env python
import threading
import time class MyThread(threading.Thread): def __init__(self, func, args, name=''):
threading.Thread.__init__(self)
self.name=name
self.func=func
self.args=args def run(self):
apply(self.func, self.args) def print_func(num): while True:
print "I am thread%d" % num
time.sleep(1) threads = []
t1 = MyThread(print_func, (1, ), print_func.__name__)
threads.append(t1)
t2 = MyThread(print_func, (2, ), print_func.__name__)
threads.append(t2)
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

首先来试试setDaemon()的设置,在上面基础代码的后面加上以下的代码:

for t in threads:
t.setDaemon(True)
t.start() print "ok\n"

程序输出:

I am thread1

I am thread2

ok

print_func()中的while循环没有继续执行下去就退出了,可见由于setDaemon(True)把子线程设置为守护线程,子线程启动后,父线程也继续执行下去,当父线程执行完最后一条语句print "ok\n"后,没有等待子线程,直接就退出了,同时子线程也一同结束。

下面我们把添加的代码更改如下:

for t in threads:
t.start()
t.join() print "ok\n"

这个时候程序会输出:

I am thread1

I am thread1

I am thread1

I am thread1

I am thread1

I am thread1

I am thread1

。。。

这样一直循环下去,可见只有第一个子线程被调用了,第二个子线程,以及父线程都没有继续走下去。这里我的理解是:join()的作用是,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。,无法运行下去。在这里父线程没法继续执行for循环,所以第二个子线程也就不会出现了。

接下来再修改一下代码:

 

python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
for t in threads:
t.start() for t in threads:
t.join() print "ok\n"
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

这时程序输出:

I am thread1

I am thread2

I am thread1

I am thread2

I am thread1

I am thread2

I am thread1

。。。

可见这个时候两个子线程都在运行了,同样在两个子线程完成之前,父线程的print "ok\n"都不会执行。

之前的实验中,两个子线程的运行时间是一样的,那么假如两个线程耗时不一样,一个子线程先于另一个子线程完成执行,会发生什么情况呢,于是我增加了一个什么都不干的方法pass_func:

python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
#! /usr/bin/env python
#coding=utf-8
import threading
import time class MyThread(threading.Thread): def __init__(self, func, args, name=''):
threading.Thread.__init__(self)
self.name=name
self.func=func
self.args=args def run(self):
apply(self.func, self.args) def print_func(num): while True:
print "I am thread%d" % num
time.sleep(1) def pass_func():
pass threads = []
t1 = MyThread(print_func, (1, ), print_func.__name__)
threads.append(t1)
t2 = MyThread(pass_func, (), pass_func.__name__)
threads.append(t2) for t in threads:
t.start() for t in threads:
t.join() print "ok\n"
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

这段代码的执行结果是:

I am thread1

I am thread1

I am thread1

I am thread1

I am thread1

I am thread1

I am thread1

I am thread1

。。。

可见这时侯,一个子线程的完成不会影响另外一个子线程,父线程仍然一直被阻塞着。

有些时候,还可以单独对某一个子线程设定join(),以达到特定的效果,例如下面这段代码片段的用意是,分别设立一个子线程用于接收数据,另外一个子线程用于发送数据,当用户输入“quit”时,程序退出:

python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
def send(sck):
while True:
data = raw_input('>')
sck.send(data)
if data == "quit":
sck.close()
break def recieve(sck):
while True:
data = sck.recv(BUFSIZ)
print data, "\n" threads = []
t1 = threading.Thread(target=send, args = (tcpCliSock, ))
threads.append(t1)
t2 = threading.Thread(target=recieve, args = (tcpCliSock, ))
threads.append(t2) for t in threads:
t.start()
t1.join()
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

这段代码中,把t1,也就是send所对应的子线程添加上join()属性,让父线程在send方法对应的子线程结束前一直在阻塞状态。假如把两个子线程t1和t2都添加上join()属性,这会使得send方法收到“quit”命令,退出循环后,由于recieve方法仍然在循环当中,父线程仍然被阻塞着,结果程序无法退出。只对t1添加join()属性,那么t1结束了,父线程会继续执行下去,直到执行完最后一条代码后退出,然后t2也跟着一同结束,达到所有线程都退出的目的,这就是t1.join()这条命令的用意。

 

thread方法说明

t.start() : 激活线程,

t.getName() : 获取线程的名称

t.setName() : 设置线程的名称

t.name : 获取或设置线程的名称

t.is_alive() : 判断线程是否为激活状态

t.isAlive() :判断线程是否为激活状态

t.setDaemon() 设置为后台线程或前台线程(默认:False,默认是是前台进程);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

t.isDaemon() : 判断是否为守护线程

t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。

t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

t.run() :线程被cpu调度后自动执行线程对象的run方法

join 代码 

python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'liujianzuo' import time,threading def f1():
pass def f2(arg1,arg2):
time.sleep(3)
print(4+5)
b = time.time()
print(b - a)
f1()
a = time.time() # join join(timeout=None) 默认timeout无值的时候,子线程会执行完再往下执行
t = threading.Thread(target=f2,args=(1,2,))
t.start()
t.join()
t = threading.Thread(target=f2,args=(1,2,))
t.start()
t.join()
t = threading.Thread(target=f2,args=(1,2,))
t.start()
t.join()
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

test 2   

 重点: 多线程传参数 args  1个参数的时候 内部一点要逗号结尾,否则报错。不认为是元祖

1
2
3
4
5
6
7
8
9
10
11
12
# _*_coding:utf-8_*_
# multi-thread
import  threading,time
def run(num):
    print 'hi ,i am the thread, ',num
    time.sleep(1)
for in range(20):
    = threading.Thread(target=run,args=(i,))  #i 不加逗号是代表一个参数,加逗号是代表一个数组
    t.start()

先执行 并发线程再执行 下一个主线程

#_*_coding:utf-8_*_

import threading
import time
def run(num):
global NUM time.sleep(1)
print " hi i am thread %s ...lalala " % num
NUM += 1 NUM =0
p_list = [] for i in range(30):
t = threading.Thread(target=run,args=(i,))
t.start()
p_list.append(t) #我们自己实现并行,先让并发线程执行,加到列表等待我们取结果即可
#t.jion() # 等待一个线程结束才会执行第二个线程,这样就成了串行,而不是并行了 for i in p_list: #
i.join() #取出我们上面放入的结果。 但是串行取出 print '---->',NUM #由于加入列表时候是并发加入的,数字没有先后,所以打印结果i的时候也就看到没有顺序。最后打印的的i会导致 NUM的变化

既不让NUM变化又实现并发 而且还是先实现并发线程后再执行 主线程 也就是再执行print NUM

RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
#_*_coding:utf-8_*_

import threading
import time
def run(num):
global NUM
lock.acquire() #上锁 注意位置,理论上是要 上锁 处理数据 解锁。一定要看好sleep时间 print " hi i am thread %s ...lalala " % num
NUM += 1
lock.release() #释放锁
time.sleep(1)
NUM =0
p_list = []
lock = threading.Lock() #制造一把锁 for i in range(30):
t = threading.Thread(target=run,args=(i,))
t.start()
p_list.append(t) #我们自己实现并行,先让并发线程执行,加到列表等待我们取结果即可
#t.jion() # 等待一个线程结束才会执行第二个线程,这样就成了串行,而不是并行了 for i in p_list: #
i.join() #取出我们上面放入的结果。 但是串行取出 print '---->',NUM #由于加锁了,所以会数字不会变化
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

信号量 semaphore  讲解: 区别一个数据一个线程即一个厕所一把钥匙 一个厕所5个坑 5吧钥匙

控制数据库链接

    semaphore = threading.BoundedSemaphore(5) 
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
#_*_coding:utf-8_*_
__author__ = 'jianzuo' #_*_coding:utf-8_*_ import threading
import time
def run(n):
semaphore.acquire()
time.sleep(1)
print "run the thread :%s\n " % n
semaphore.release() if __name__ == '__main__':
num = 0
semaphore = threading.BoundedSemaphore(5) for i in range(30):
t = threading.Thread(target=run,args=(i,))
t.start()
while threading.active_count != 1 #一个程序至少有一个进程一个线程。所以等于1 就会结束
pass
else:
print '-----all thread done-----'
print '---->',num
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

守护线程 守护线程 守护线程程宕掉,它所产生的子线程没有存在意义 用于控制子线程

python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
import time,threading

def f1():
pass def f2(arg1,arg2):
time.sleep(3)
print(4+5)
b = time.time()
print(b - a)
f1()
a = time.time() # 上面默认是 setDaemon(False) 即 主线程会handle住,一直等待多线程执行完毕
# 如果不想让 主线程等待子线程,那么我们直接在start之前 改t.setDaemon(True) 即可 t= threading.Thread(target=f2,args=(1,2,))
t.setDaemon(True)
t.start()
t = threading.Thread(target=f2,args=(1,2,))
t.setDaemon(True)
t.start()
t = threading.Thread(target=f2,args=(1,2,))
t.setDaemon(True)
t.start()
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

守护线程脚本讲解    

我的一个main_thread 主线程产生10个子线程,如果主线程不是守护进程,他们都是并发,执行完才会走下一个主进程,如果设置为守护进程,一旦执行下一个主线程,代表main_thread结束,其他线程执行多少算多少。

python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享
#_*_coding:utf-8_*_
__author__ = 'jianzuo' import threading
import time
def run(num):
if not num == 5:
time.sleep(1)
print " hi i am thread %s ...lalala \n" % num def main(n):
print "------running main thread----------" for i in range(10):
t = threading.Thread(target=run,args=(i,))
t.start() #time.sleep(3)
print "------done main thread----------" main_thread = threading.Thread(target=main,args=(10,)) #主线程产生10个子线程 main_thread.setDaemon(True) #将主线程设置为守护线程 main_thread.start()
time.sleep(2) print '\n------->>>>>' #顶格的都是主线程
python基础-12 多线程queue 线程交互event 线程锁 自定义线程池 进程 进程锁 进程池 进程交互数据资源共享

Lock、Rlock类


  由于线程之间随机调度:某线程可能在执行n条后,CPU接着执行其他线程。为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。

Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。

可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。

RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。

可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

简言之:Lock属于全局,Rlock属于线程。

构造方法: 
Lock(),Rlock(),推荐使用Rlock()

实例方法: 
  acquire([timeout]): 尝试获得锁定。使线程进入同步阻塞状态。 
  release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

例子一(未使用锁):

#coding:utf-8
import threading
import time gl_num = 0 def show(arg):
global gl_num
time.sleep(1)
gl_num +=1
print gl_num for i in range(10):
t = threading.Thread(target=show, args=(i,))
t.start() print 'main thread stop' 未使用锁

未使用锁

main thread stop
9 Process finished with exit code 0 多次运行可能产生混乱。这种场景就是适合使用锁的场景。 运行结果

运行结果

例子二(使用锁):

# coding:utf-8

import threading
import time gl_num = 0 lock = threading.RLock() # 调用acquire([timeout])时,线程将一直阻塞,
# 直到获得锁定或者直到timeout秒后(timeout参数可选)。
# 返回是否获得锁。
def Func():
lock.acquire()
global gl_num
gl_num += 1
time.sleep(1)
print gl_num
lock.release() for i in range(10):
t = threading.Thread(target=Func)
t.start() 使用Lock

使用锁

4
8 Process finished with exit code 0
可以看出,全局变量在在每次被调用时都要获得锁,才能操作,因此保证了共享数据的安全性 运行结果

运行结果

Lock与Rlock 对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#coding:utf-8
  
import threading
lock = threading.Lock() #Lock对象
lock.acquire()
lock.acquire()  #产生了死锁。
lock.release()
lock.release()
print lock.acquire()
  
  
import threading
rLock = threading.RLock()  #RLock对象
rLock.acquire()
rLock.acquire() #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()

Condition类


  Condition(条件变量)通常与一个锁关联。需要在多个Contidion*享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

  可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

构造方法: 
Condition([lock/rlock])

实例方法: 
  acquire([timeout])/release(): 调用关联的锁的相应方法。 
  wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。 
  notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。 
  notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

例子一:生产者消费者模型

# encoding: UTF-8
import threading
import time # 商品
product = None
# 条件变量
con = threading.Condition() # 生产者方法
def produce():
global product if con.acquire():
while True:
if product is None:
print 'produce...'
product = 'anything' # 通知消费者,商品已经生产
con.notify() # 等待通知
con.wait()
time.sleep(2) # 消费者方法
def consume():
global product if con.acquire():
while True:
if product is not None:
print 'consume...'
product = None # 通知生产者,商品已经没了
con.notify() # 等待通知
con.wait()
time.sleep(2) t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start() 生产者消费者模型

生产者消费者模型

produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume... Process finished with exit code -1
程序不断循环运行下去。重复生产消费过程。 运行结果

运行结果

例子二:生产者消费者模型

import threading
import time condition = threading.Condition()
products = 0 class Producer(threading.Thread):
def run(self):
global products
while True:
if condition.acquire():
if products < 10:
products += 1;
print "Producer(%s):deliver one, now products:%s" %(self.name, products)
condition.notify()#不释放锁定,因此需要下面一句
condition.release()
else:
print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)
condition.wait();#自动释放锁定
time.sleep(2) class Consumer(threading.Thread):
def run(self):
global products
while True:
if condition.acquire():
if products > 1:
products -= 1
print "Consumer(%s):consume one, now products:%s" %(self.name, products)
condition.notify()
condition.release()
else:
print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)
condition.wait();
time.sleep(2) if __name__ == "__main__":
for p in range(0, 2):
p = Producer()
p.start() for c in range(0, 3):
c = Consumer()
c.start() 生产者消费者模型

生产者消费者模型

例子三:

import threading

alist = None
condition = threading.Condition() def doSet():
if condition.acquire():
while alist is None:
condition.wait()
for i in range(len(alist))[::-1]:
alist[i] = 1
condition.release() def doPrint():
if condition.acquire():
while alist is None:
condition.wait()
for i in alist:
print i,
print
condition.release() def doCreate():
global alist
if condition.acquire():
if alist is None:
alist = [0 for i in range(10)]
condition.notifyAll()
condition.release() tset = threading.Thread(target=doSet,name='tset')
tprint = threading.Thread(target=doPrint,name='tprint')
tcreate = threading.Thread(target=doCreate,name='tcreate')
tset.start()
tprint.start()
tcreate.start() 生产者消费者模型

生产者消费者模型

Event类


  Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。

  Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。

构造方法: 
Event()

实例方法: 
  isSet(): 当内置标志为True时返回True。 
  set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。 
  clear(): 将标志设为False。 
  wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。

例子一

# encoding: UTF-8
import threading
import time event = threading.Event() def func():
# 等待事件,进入等待阻塞状态
print '%s wait for event...' % threading.currentThread().getName()
event.wait() # 收到事件后进入运行状态
print '%s recv event.' % threading.currentThread().getName() t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start() time.sleep(2) # 发送事件通知
print 'MainThread set event.'
event.set()
Thread-1 wait for event...
Thread-2 wait for event... #2秒后。。。
MainThread set event.
Thread-1 recv event.
Thread-2 recv event. Process finished with exit code 0

timer类


  Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。

构造方法: 
Timer(interval, function, args=[], kwargs={}) 
  interval: 指定的时间 
  function: 要执行的方法 
  args/kwargs: 方法的参数

实例方法: 
Timer从Thread派生,没有增加实例方法。

例子一: 线程延迟5秒后执行。

# encoding: UTF-8
import threading def func():
print 'hello timer!' timer = threading.Timer(5, func)
timer.start()

local类


  local是一个小写字母开头的类,用于管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。

  可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。

# encoding: UTF-8
import threading local = threading.local()
local.tname = 'main' def func():
local.tname = 'notmain'
print local.tname t1 = threading.Thread(target=func)
t1.start()
t1.join() print local.tname
notmain
main

线程锁threading.RLock和threading.Lock

我们使用线程对数据进行操作的时候,如果多个线程同时修改某个数据,可能会出现不可预料的结果,为了保证数据的准确性,引入了锁的概念。

例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,另外一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部分为0,一部分为1,这就导致了数据的不一致。锁的出现解决了这个问题。

import threading
import time globals_num = 0 lock = threading.RLock() def Func():
lock.acquire() # 获得锁
global globals_num
globals_num += 1
time.sleep(1)
print(globals_num)
lock.release() # 释放锁 for i in range(10):
t = threading.Thread(target=Func)
t.start()

threading.RLock和threading.Lock 的区别

RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

import threading
lock = threading.Lock() #Lock对象
lock.acquire()
lock.acquire() #产生了死琐。
lock.release()
lock.release()
1
2
3
4
5
6
import threading
rLock = threading.RLock()  #RLock对象
rLock.acquire()
rLock.acquire()    #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()

RLOCK递归锁的用处  解锁多层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# _*_coding:utf-8_*_
# multi-thread
import  threading,time
number = 0
lock = threading.RLock()
def run(num):
    lock.acquire() #获取一把锁
    global number
    #print 'hi ,i am the thread,',num
    number += 1
    lock.release() #解锁  上面是锁内独占内容
    print number
    time.sleep(1)
for in range(20):
    = threading.Thread(target=run,args=(i,))
    t.start()

  

  

threading.Event

Event是线程间通信最间的机制之一:一个线程发送一个event信号,其他的线程则等待这个信号。用于主线程控制其他线程的执行。 Events 管理一个flag,这个flag可以使用set()设置成True或者使用clear()重置为False,wait()则用于阻塞,在flag为True之前。flag默认为False。

  • Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
  • Event.set() :将标识位设为Ture
  • Event.clear() : 将标识伴设为False。
  • Event.isSet() :判断标识位是否为Ture。
import threading

def do(event):
print('start')
event.wait()
print('execute') event_obj = threading.Event()
for i in range(10):
t = threading.Thread(target=do, args=(event_obj,))
t.start() event_obj.clear()
inp = input('input:')
if inp == 'true':
event_obj.set()

当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。

#!/usr/bin/env python
# _*_ coding:utf-8 _*_ import threading
import time
import random def light():
if not event.isSet(): #没有设置的话
event.set() # 设置绿灯
count = 0 #计数器秒数
while True:
if count < 10: #小于十秒 是绿灯
print("\033[42;1m ------green light on ----\033[0m")
elif count < 13: #小于13秒 大于10秒 是黄灯
print("\033[43;1m ------yellow light on ----\033[0m")
elif count < 20: #小于于20秒 有设置则取消
if event.isSet():
event.clear()
print("\033[41;1m ------red light on ----\033[0m")
else: #大于20 重新
count = 0 #取消秒数计时
event.set() #重新变为绿灯 time.sleep(1)
count +=1 def car(n): # 第二个线程 车线程
while 1:
time.sleep(random.randrange(3)) #随机等待三秒
if event.isSet():
print("car [%d] is running..." % n ) #如果被设置了信号则是绿灯,该线程的车即可通过
else: #否则的话提示红灯
print("car [%d] is waitting for the red light.." %n)
event.wait() #红灯的话,会在此处卡住,不往下执行
print("Green light is on ,car %s is running......." %n)
if __name__ == '__main__': #下面是定义了两个线程 ,灯线程 车线程, threading.Event用来设置标着符号让两个线程交流
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()

!!线程之间交互  threading.Event方法  红灯 绿灯  信号标志位

#_*_coding:utf-8_*_

import threading
import time
import random def light():
if not event.isSet(): #没有设置的话
event.set() # 设置绿灯
count = 0 #计数器秒数
while True:
if count < 10: #小于十秒 是绿灯
print "\033[42;1m ------green light on ----\033[0m"
elif count < 13: #小于13秒 大于10秒 是黄灯
print "\033[43;1m ------yellow light on ----\033[0m"
elif count < 20: #小于于20秒 有设置则取消
if event.isSet():
event.clear()
print "\033[41;1m ------red light on ----\033[0m"
else: #大于20 重新
count = 0 #取消秒数计时
event.set() #重新变为绿灯 time.sleep(1)
count +=1 def car(n): # 第二个线程 车线程
while 1:
time.sleep(random.randrange(3)) #随机等待三秒
if event.isSet():
print "car [%s] is running..." % n #如果被设置了信号则是绿灯,该线程的车即可通过
else: #否则的话提示红灯
print "car [%s] is waitting for the red light.." %n
event.wait() #红灯的话,会在此处卡住,不往下执行
print "Green light is on ,car %s is running......." %n
if __name__ == '__main__': #下面是定义了两个线程 ,灯线程 车线程, threading.Event用来设置标着符号让两个线程交流
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()

threading.Condition:

一个condition变量总是与某些类型的锁相联系,这个可以使用默认的情况或创建一个,当几个condition变量必须共享和同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。

condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和 release() 会调用与锁相关联的相应的方法。

其他和锁关联的方法必须被调用,wait()方法会释放锁,当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,

Condition(lock=None)

Condition类实现了一个conditon变量。 这个conditiaon变量允许一个或多个线程等待,直到他们被另一个线程通知。 如果lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。

  • wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。

如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。

注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。除非线程调用notify()和notify_all()之后放弃了锁的所有权。

在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。 例子: 生产者-消费者模型,

import threading
import time
def consumer(cond):
with cond:
print("consumer before wait")
cond.wait()
print("consumer after wait") def producer(cond):
with cond:
print("producer before notifyAll")
cond.notifyAll()
print("producer after notifyAll") condition = threading.Condition()
c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
c2 = threading.Thread(name="c2", target=consumer, args=(condition,)) p = threading.Thread(name="p", target=producer, args=(condition,)) c1.start()
time.sleep(2)
c2.start()
time.sleep(2)
p.start()

consumer()线程要等待producer()设置了Condition之后才能继续。

queue模块

Queue 就是对队列,它是线程安全的

举例来说,我们去肯德基吃饭。厨房是给我们做饭的地方,前台负责把厨房做好的饭卖给顾客,顾客则去前台领取做好的饭。这里的前台就相当于我们的队列。

这个模型也叫生产者-消费者模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import queue
= queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。
q.join()    # 等到队列为kong的时候,在执行别的操作
q.qsize()   # 返回队列的大小 (不可靠)
q.empty()   # 当队列为空的时候,返回True 否则返回False (不可靠)
q.full()    # 当队列满的时候,返回True,否则返回False (不可靠)
q.put(item, block=True, timeout=None#  将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,
                         为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,
                          如果队列无法给出放入item的位置,则引发 queue.Full 异常
q.get(block=True, timeout=None#   移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,
                      若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。
q.put_nowait(item) #   等效于 put(item,block=False)
q.get_nowait() #    等效于 get(item,block=False)

  

生产者--消费者:

#!/usr/bin/env python
import Queue
import threading message = Queue.Queue(10) def producer(i):
while True:
message.put(i) def consumer(i):
while True:
msg = message.get() for i in range(12):
t = threading.Thread(target=producer, args=(i,))
t.start() for i in range(10):
t = threading.Thread(target=consumer, args=(i,))
t.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# _*_coding:utf-8_*_
# 服务器端socket
import threading,time
import Queue
import random
q=Queue.Queue()
def Produce(name):
    for in range(20):
        q.put(i) #放到队列
        print "\033[32;1mProducer %s has made %s baozi .. \033[0m" % (name,i)
        time.sleep(random.randrange(1)) #随机休息
def Consumer(name):
    count = 0
    while count <20:
        data = q.get() #取得队列上的
        print "\033[31;1mConsumer %s has eaten %s baozi ... chihuo.. \033[0m" %  (name,data)
        count += 1
        time.sleep(random.randrange(4))  #随机休息,但是取得的东西比生产快
= threading.Thread(target=Produce,args=('alex',)) # 生产者
= threading.Thread(target=Consumer,args=('liu',)) #消费者
p.start() #启动
c.start()

双向队列  稍后总结:

==????===??

 多进程  

multiprocessing模块

multiprocessing是python的多进程管理包,和threading.Thread类似。直接从侧面用subprocesses替换线程使用GIL的方式,由于这一点,multiprocessing模块可以让程序员在给定的机器上充分的利用CPU。

在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'liujianzuo'
from multiprocessing import Process
def f(name):
    print("hello",name)
if __name__ == "__main__":
    = Process(target=f, args=("alex",))
    print(123)
    p.start()
    p.join() 

注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销  

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。
该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),
用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。 但在使用这些共享API的时候,我们要注意以下几点: 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。
对于多线程来说,由于只有一个进程,所以不存在此必要性。
multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。
此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

进程间的数据共享

Array 共享进程间的列表  linux无问题。windows下有问题

# 内存演示, 线程的内存是1份  进程是独自申请线性地址空间

# 线程

import time
import threading
li = [] def f(i):
time.sleep(3)
li.append(i)
print(i,li) if __name__ == '__main__':
for i in range(10):
t = threading.Thread(target=f,args=(i,))
t.start()

线程对比代码

# 进程 Array  进程之间数据共享  linux 可以执行输出

import multiprocessing

li = []

def f(i):
time.sleep(3)
li.append(i)
print(i,li) if __name__ == '__main__':
for i in range(10):
p = multiprocessing.Process(target=f,args=(i,))
p.start() from multiprocessing import Process,Array
temp = Array('i', [11,22,33,44]) def Foo(i,temp):
temp[i] = 100+i
for item in temp:
print(i,'----->',item)
if __name__ == '__main__':
for i in range(2):
p = Process(target=Foo,args=(i,temp))
p.start() 不同平台的进程共享数据的代码

不同平台的进程共享数据的代码

Manager  共享进程间的 字典 和列表  linux 可以执行输出

from multiprocessing import Process,Manager

def f(i,dic):
dic[i] = 100 + i
for k,v in dic.items():
print(k,v)
print("end") if __name__ == "__main__":
manage = Manager()
dic = manage.dict()
for i in range(2):
p = Process(target=f, args=(i,dic))
p.start()
p.join() !/usr/bin/env python
_*_coding:utf-8_*_
from multiprocessing import Process,Manager def f(i,li):
li.append(i) for i in li:
print(i)
print("end") if __name__ == "__main__":
manage = Manager()
li = manage.list()
for i in range(2):
p = Process(target=f, args=(i,li))
p.start()
p.join() manager方式 字典 列表的共享

Manager进程数据共享

Pipe例子 管道

import  multiprocessing,time

def f(conn):
conn.send([42,None,'HELLO'])
conn.close if __name__ == '__main__': #管道发送 子进程数据给父进程接收
parent_conn,child__conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=f,args=(child__conn,))
p.start()
print parent_conn.recv() #prints "[42,None,'hello']"
p.join() #_*_coding:utf-8_*_
__author__ = 'jianzuo' import multiprocessing,time def f(conn):
conn.send([42,None,'HELLO'])
conn.close if __name__ == '__main__': #管道发送 子进程数据给父进程接收 A,B = multiprocessing.Pipe() #赋值A B为管道两端
p = multiprocessing.Process(target=f,args=(B,)) #将b子进程数据发送到管道
p.start()
print A.recv() #prints "[42,None,'hello']" #父进程接收
p.join()

pipe管道数据交流

Multiprocess 的queue 队列 案例

import  multiprocessing,time

def f(conn,q):
conn.send([42,None,'HELLO'])
q.put('hahhahhah')
conn.close if __name__ == '__main__': #管道发送 子进程数据给父进程接收
'''
parent_conn,child__conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=f,args=(child__conn,))
p.start()
print parent_conn.recv() #prints "[42,None,'hello']"
p.join()
''' A,B = multiprocessing.Pipe() #赋值A B为管道两端
Q = multiprocessing.Queue()
p = multiprocessing.Process(target=f,args=(B,Q)) #将b子进程数据发送到管道
p.start()
print A.recv() #prints "[42,None,'hello']" #父进程接收
print "queue....----",Q.get()
p.join()

多进程的queue队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#方法一,Array
from multiprocessing import Process,Array
temp = Array('i', [11,22,33,44])
  
def Foo(i):
    temp[i] = 100+i
    for item in temp:
        print i,'----->',item
  
for in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
  
#方法二:manage.dict()共享数据
from multiprocessing import Process,Manager
  
manage = Manager()
dic = manage.dict()
  
def Foo(i):
    dic[i] = 100+i
    print dic.values()
  
for in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
    p.join()
'c': ctypes.c_char,  'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

对象类型表

当创建进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值。

多进程LOCK,如果不锁 屏幕抢占输出进程锁实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process, Array, RLock
def Foo(lock,temp,i):
    """
    将第0个数加100
    """
    lock.acquire()
    temp[0] = 100+i
    for item in temp:
        print i,'----->',item
    lock.release()
lock = RLock()
temp = Array('i', [11, 22, 33, 44])
for in range(20):
    p = Process(target=Foo,args=(lock,temp,i,))
    p.start()
  

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async

  实际应用中,并不是每次执行任务的时候,都去创建多进程,而是维护了一个进程池,每次执行的时候,都去进程池取一个,如果进程池里面的进程取光了,就会阻塞在那里,直到进程池中有可用进程为止。首先来看一下进程池提供了哪些方法

  • apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。

  • close() : 等待任务完成后在停止工作进程,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

  • terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。

  • join() : 等待工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait,否则进程会成为僵尸进程。

下面来简单的看一下代码怎么用的

# join相当于wait 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process,Pool
import time
def foo(i):
    time.sleep(3)
    print(i+100)
    return i+100
def bar(args):
    print(args)
if __name__ == '__main__':
    pool = Pool(6)
    for in range(12):
        # pool.apply(func=foo, args=(i,)) # apply 默认是依次执行完每个进程才会进行下一个,相当于Daemon = False。每一个任务是排队执行的,有进程.join()
        pool.apply_async(func=foo, args=(i,), callback=bar) # apply_async默认是不等待的相当于Daemon = True,,批量生成 , 而async这个方法还可以用回调函数,
                                   即,foo的执行完,其返回值传给回调函数,做处理。每一个任务都是并发进行的,不等待。无进程.join()
    print("end")
    pool.close() # 阻止执行 超过规定的进程数量  close 是每个进程运行完毕才会继续,terminate是不管执行没有执行完,都跳过
    pool.join() # 上面必须有个close 或者terminate  否则发生assert错误 断言错误
     # join相当于wait  在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。
对于多线程来说,由于只有一个进程,所以不存在此必要性。

  

协程

协程内部就是控制单个线程来回跳动执行的,又称微线程,不需要cpu的操作时候,比如并发请求网站的时候 。是网卡io请求,我们就可以用协程。爬虫。

协程,又称微线程,协程执行看起来有点像多线程,但是事实上协程就是只有一个线程,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显,此外因为只有一个线程,不需要多线程的锁机制,也不存在同时写变量冲突。协程的适用场景:当程序中存在大量不需要CPU的操作时(IO)下面来看一个利用协程例子

协程的一个基本模块 greenlet ,功能很少,需要手动控制。而丰富的协程模块gevent 就是底层调用的greenlet,可以自动跳。

安装方法

参考http://www.cnblogs.com/liujianzuo888/articles/5507196.html

源码或者二进制exe方式安装:需要手动安装依赖

pip安装:会自动安装依赖,类似linux的yum

1
2
3
python -m pip install --upgrade pip
python -m pip install gevent

  

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

greenlet

1
# 遇到greenlet 的switch 某个函数就会跳。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/usr/bin/env python
# -*- coding:utf-8 -*-
  
  
from greenlet import greenlet
  
  
def test1():
    print 12
    gr2.switch()
    print 34
    gr2.switch()
  
  
def test2():
    print 56
    gr1.switch()
    print 78
  
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

gevent

相比greenlet gevent 是调的greenlet ,可以自动跳

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent
  
def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')
  
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')
  
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

  遇到IO操作自动切换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from gevent import monkey; monkey.patch_all()
import gevent
import urllib2
def f(url):
    print('GET: %s' % url)
    resp = urllib2.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))
gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])

  

管理上下文的模块contextlib

1
2
3
4
5
6
7
8
9
10
11
12
13
import contextlib<br>@contextlib.contextmanager
def myopen(path_file,mode):
    = open(path_file,mode)
    try:
        yield f
    finally:
        f.close()
import os,sys
# path = os.path.join(os.path.dirname(__file__),"s2.py")
with myopen("s2.py","r") as f1:
    for line in f1:
        print(line)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import contextlib
li = [1,]
@contextlib.contextmanager
def f1(list,arg):
    li.append(arg)
    try:
        yield
    finally:
        print(123)
        li.remove(arg)
with f1(li,3):
    print(li)