操作系统帮你封装好硬件复杂的接口,提供比较好的接口给应用程序去调,应用程序调硬件只需调操作系统的接口就可以了;操作系统负责管理运行的多个进程
多道技术:(针对单核实现并发(看起来是同时运行的)第三代计算机)
CPU、内存、硬盘
空间上的复用;(实现物理层面的隔离,都读到内存空间里)
时间上的复用:遇到I/O操作去硬盘读取内容时cpu要去切(提升效率)、遇到一个程序运行时间过长了也要切(实现并发的效果反而会降低效率);
多核,叫并行了,真正意义上的并行;
并发编程之多进程
进程理论
进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。
程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。同一个程序执行两次,那也是两个进程。
无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务
并发和并行
一 并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发。
二 并行:同时运行,只有具备多个cpu才能实现并行
单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的)
有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,
一旦任务1遇到I/O就*中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术
而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算)
可能被分 配给四个cpu中的任意一个去执行
进程的层次结构
无论UNIX还是windows,进程只有一个父进程,不同的是:
-
在UNIX中所有的进程,都是以init进程为根,组成树形结构。父子进程共同组成一个进程组,这样,当从键盘发出一个信号时,该信号被送给当前与键盘相关的进程组中的所有成员。
-
在windows中,没有进程层次的概念,所有的进程都是地位相同的,唯一类似于进程层次的暗示,是在创建进程时,父进程得到一个特别的令牌(称为句柄),该句柄可以用来控制子进程,但是父进程有权把该句柄传给其他子进程,这样就没有层次了。
开启子进程的两种方式:
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,>提供了Process、Queue、Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu
Process类的介绍
创建进程的类
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程 强调: 1. 需要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍
group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,'egon',) kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} name为子进程的名称
方法介绍
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间。
属性介绍
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name:进程的名称
Process类的使用
注意:在windows中Process()必须放到# if __name__ == '__main__':下
##创建子进程方式一
from multiprocessing import Process import time def task(name): print('%s is done'%name) time.sleep(3) print('%s is done'%name) if __name__ == '__main__': p = Process(target=task,args=('子进程1',)) #实例化得到一个对象 #Process(target=task,kwargs={'name':'子进程1'}) p.start()#仅仅是给操作系统发送个信号 (我父进程有个子进程你去给我开吧);父进程不会等着它开启 #子进程的初始状态跟父进程一样,但运行是独立的 print('主') #打印 主 子进程1 is done 子进程1 is done
###方式二 from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name): super().__init__() self.name = name def run(self): #固定写法,子进程 print('%s is done'%self.name) time.sleep(3) print('%s is done'%self.name) if __name__ == '__main__': p = MyProcess('子进程') p.start() #调用那个run print('主')
查看pid
from multiprocessing import Process import time, os def task(): print('%s is done,parent id is %s'%(os.getpid(),os.getppid())) #看进程编号,看那个在执行 time.sleep(3) print('%s is done,parent id is %s'%(os.getpid(),os.getppid()))#os.getppid()就是它父的编号 if __name__ == '__main__': p = Process(target=task, ) # p.start() print('主',os.getpid(),os.getppid())#os.getppid()是pycharm的,主进程的爹 #打印: 主 6816 3580 6704 is done,parent id is 6816 6704 is done,parent id is 6816
僵尸进程和孤儿进程(了解)
父进程开启一个子进程,他自己结束之后,会等子进程运行完,他才会结束。
所有的子进程都会有个僵尸进程,他死了之后,会保留尸体,父进程可以随时查看他的状态。子进程在父进程一直不死的情况下会有害,多个僵尸进程占用内存。
父进程先死掉,子进程还在就是孤儿进程;在linux系统之上会有一个INIT,这个进程是所有进程它爹,INIT相当于孤儿院,由它来接管;
Process对象的其他属性和方法
join()
from multiprocessing import Process import time, os def task(): print('%s is done,parent id is %s'%(os.getpid(),os.getppid())) #看进程编号,看哪个在执行 time.sleep(3) print('%s is done,parent id is %s'%(os.getpid(),os.getppid()))#os.getppid()就是它父的编号 if __name__ == '__main__': p = Process(target=task, ) # p.start() p.join()#主进程在等p结束,这一行不过去永远不会执行下一步,最后执行主进程 print('主',os.getpid(),os.getppid()) print(p.pid) #进程结束掉之后,子进程运行完之后可以查看它的pid,但是主进程结束后就没有了,僵尸儿子才会被回收掉。在cmd命令下查看5300是否还存在,结果是被回收了
#打印:
5300 is done,parent id is 4780
5300 is done,parent id is 4780
主 4780 3580
5300
from multiprocessing import Process import time, os def task(name): print('%s is dones'%name) time.sleep(3) if __name__ == '__main__': p1 = Process(target=task, args=('子进程1',)) # p2 = Process(target=task, args=('子进程2',)) p3 = Process(target=task, args=('子进程3',)) p1.start() p2.start() p3.start() print('主',os.getpid(),os.getppid()) #打印 结果不唯一,p1.start()、p2、p3只是给给操作系统发个信息,操作系统先运行谁说不准,多打印几次结果就不一样了 主 4804 3580 子进程2 is dones 子进程1 is dones 子进程3 is dones
from multiprocessing import Process import time, os def task(name, n): print('%s is dones'%name) #看进程编号,看那个在执行 time.sleep(n) if __name__ == '__main__': start = time.time() p1 = Process(target=task, args=('子进程1',5)) # p2 = Process(target=task, args=('子进程2',3)) p3 = Process(target=task, args=('子进程3',2)) p_l = [p1,p2,p3] # p1.start() # p2.start() # p3.start() #也可以改成for循环 for p in p_l: p.start() # p1.join() # p2.join() # p3.join() for p in p_l: p.join() print('主',(time.time()-start)) #等的就是那个最长的时间,程序仍然是并发执行,不是串连执行 打印: 子进程1 is dones 子进程3 is dones 子进程2 is dones 主 5.244300127029419
from multiprocessing import Process import time, os def task(name, n): print('%s is dones'%name) #看进程编号,看那个在执行 time.sleep(n) if __name__ == '__main__': start = time.time() p1 = Process(target=task, args=('子进程1',5)) # p2 = Process(target=task, args=('子进程2',3)) p3 = Process(target=task, args=('子进程3',2)) p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() print('主',(time.time()-start)) #这样就是串行了,等1结束,然后等2、等3 #打印 子进程1 is dones 子进程2 is dones 子进程3 is dones 主 10.41359567642212
属性
### is_alive()判断是否存活 from multiprocessing import Process import time, os def task(): print('%s is done,parent id is %s'%(os.getpid(),os.getppid())) time.sleep(3) print('%s is done,parent id is %s'%(os.getpid(),os.getppid())) if __name__ == '__main__': p = Process(target=task, ) # p.start() print(p.is_alive()) p.join() print('主',os.getpid(),os.getppid()) print(p.pid) print(p.is_alive()) #看看p是否还活着
###terminate() 、 name属性
from multiprocessing import Process import time, os def task(): print('%s is done,parent id is %s'%(os.getpid(),os.getppid())) time.sleep(3) print('%s is done,parent id is %s'%(os.getpid(),os.getppid())) if __name__ == '__main__': p = Process(target=task, name= 'sub-process') # p.start() p.terminate()#杀死这个p进程,它只是给操作系统发个信息,杀死进程是要回收内存空间的是由操作系统说了算 time.sleep(3)#不加这个杀不死,要等操作系统缓存下 print(p.is_alive()) print('主') print(p.name) #不指定就默认Process-1 打印: False 主 sub-process
练习题
1、进程之间的内存空间是共享的还是隔离的?下述代码的执行结果是什么?
进程内存空间是隔离的。
from multiprocessing import Process n=100 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就可以了 def work(): global n n=0 print('子进程内: ',n) if __name__ == '__main__': p=Process(target=work) p.start() p.join()##确保子进程已经运行过了 print('主进程内: ',n) 打印: 子进程内: 0 主进程内: 100
2、基于多进程实现并发的套接字通信?
##服务端 from socket import * from multiprocessing import Process def talk(conn): while True: try: data = conn.recv(1024) #干通信的活 if not data:break conn.send(data.upper()) except ConnectionRefusedError: break conn.close() def server(ip, port): server = socket(AF_INET, SOCK_STREAM) server.bind((ip, port)) server.listen(5) while True: conn,addr = server.accept() ##主进程专门干链接的活,一直建链接,每建成一个链接就起一个进程跟客户端通信 p = Process(target=talk,args=(conn,)) p.start() server.close() if __name__ == '__main__': server('127.0.0.1',8080)
##客户端 from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect(('127.0.0.1', 8080)) while True: msg = input('>>>:').strip() if not msg :continue client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8'))
3、改写下列程序,分别别实现下述打印效果
# from multiprocessing import Process # import time # import random # # def task(n): # time.sleep(random.randint(1,3)) # print('-------->%s' %n) # # if __name__ == '__main__': # p1=Process(target=task,args=(1,)) # p2=Process(target=task,args=(2,)) # p3=Process(target=task,args=(3,)) # # p1.start() # p2.start() # p3.start() # # p1.join() # p2.join() # p3.join() # print('-------->4')
打印:#---->4最后打印
-------->1
-------->3
-------->2
-------->4
from multiprocessing import Process import time import random def task(n): time.sleep(random.randint(1,3)) print('-------->%s' %n) if __name__ == '__main__': p1=Process(target=task,args=(1,)) p2=Process(target=task,args=(2,)) p3=Process(target=task,args=(3,)) p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() print('-------->4')
打印:#按顺序输出
-------->1
-------->2
-------->3
-------->4
守护进程
主进程创建子进程,然后将该进程设置成守护自己的进程,守护进程就好比崇祯皇帝身边的老太监,崇祯皇帝已死老太监就跟着殉葬了。
守护进程需要强调两点:
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程分别去执行就ok了,如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成守护进程。主进程代码运行结束,守护进程随即终止。
from multiprocessing import Process import time def task(name): print('%s is done'%name) time.sleep(3) p = Process(target=time.sleep, args=(3,)) #再开子进程会报错,守护进程不让开 p.start() if __name__ == '__main__': p = Process(target=task, args=('子进程1',)) p.daemon = True #要在开始之前设置 p.start() p.join() #确保了守护进程确实运行完毕了 print('主') #只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了。 打印: 子进程1 is done Process Process-1: Traceback (most recent call last): File ..... AssertionError: daemonic processes are not allowed to have children 主
#练习 from multiprocessing import Process import time def foo(): #守护进程 print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------") #运行到这一行的时候p1 p2还没出来; 打印: main------- #只要它出现,p1立马就死了; 456 end456
互斥锁
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱。
如何控制,就是加锁处理。而互斥锁的意思就是互相排斥,如果把多个进程比喻为多个人,互斥锁的工作原理就是多个人都要去争抢同一个资源:卫生间,一个人抢到卫生间后上一把锁,其他人都要等着,等到这个完成任务后释放锁,其他人才有可能有一个抢到......所以互斥锁的原理,就是把并发改成穿行,降低了效率,但保证了数据安全不错乱。
from multiprocessing import Process import time def task(name): print('%s 1' %name) time.sleep(1) print('%s 2' %name) time.sleep(1) print('%s 3' %name) if __name__ == '__main__': for i in range(3): p=Process(target=task,args=('进程%s' %i,)) p.start() 打印:#竞争 谁抢这终端了谁打印 进程0 1 进程1 1 进程2 1 进程0 2 进程1 2 进程2 2 进程0 3 进程1 3 进程2 3
from multiprocessing import Process, Lock import time def task(name, mutex): #所有的子进程来抢这把锁,然后来运行下面这段代码;运行完之后,其他字进程才进入 mutex.acquire() #加锁 print('%s 1' %name) time.sleep(1) print('%s 2' %name) time.sleep(1) print('%s 3' %name) mutex.release() #运行完之后要释放 if __name__ == '__main__': mutex = Lock() #父进程里边早出来这个变量对象,子进程也会拷贝一份 for i in range(3): p=Process(target=task,args=('进程%s' %i, mutex))#传给子进程,所有的子进程用一把锁 p.start() #打印: 进程0 1 进程0 2 进程0 3 进程2 1 进程2 2 进程2 3 进程1 1 进程1 2 进程1 3
模拟抢票
#查看票数的时候是并发,购票是串,一个一个来,加个互斥锁
#文件db.txt的内容为:{"count":1} #一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock import json import time def search(name): time.sleep(1) dic=json.load(open('db.txt','r',encoding='utf-8')) print('<%s> 查看到剩余票数【%s】' %(name,dic['count'])) def get(name): time.sleep(1) dic=json.load(open('db.txt','r',encoding='utf-8')) if dic['count'] > 0: dic['count']-=1 time.sleep(3) ##模拟写数据的网络延迟 json.dump(dic,open('db.txt','w',encoding='utf-8')) print('<%s> 购票成功' %name) def task(name,mutex): search(name) mutex.acquire() get(name) mutex.release() if __name__ == '__main__': mutex=Lock() for i in range(10): #模拟并发10个客户端抢票 p=Process(target=task,args=('路人%s' %i,mutex)) p.start() 打印: <路人0> 查看到剩余票数【1】 <路人2> 查看到剩余票数【1】 <路人1> 查看到剩余票数【1】 <路人4> 查看到剩余票数【1】 <路人3> 查看到剩余票数【1】 <路人6> 查看到剩余票数【1】 <路人5> 查看到剩余票数【1】 <路人8> 查看到剩余票数【1】 <路人7> 查看到剩余票数【1】 <路人9> 查看到剩余票数【1】 <路人0> 购票成功
互斥锁和join的区别
互斥锁就是把并发变成串行,保证多个进程去修改同一个数据的时候大家一个一个修改,牺牲了效率保证了数据的安全;
join也可以使程序串行
发现使用join将并发改成串行,确实能保证数据安全,但问题是连查票操作也变成只能一个一个人去查了,很明显大家查票时应该是并发地去查询而无需考虑数据准确与否,此时join与互斥锁的区别就显而易见了,join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行,比如只让task函数中的get任务串行。
from multiprocessing import Process,Lock import json import time def search(name): time.sleep(1) dic=json.load(open('db.txt','r',encoding='utf-8')) print('<%s> 查看到剩余票数【%s】' %(name,dic['count'])) def get(name): time.sleep(1) dic=json.load(open('db.txt','r',encoding='utf-8')) if dic['count'] > 0: dic['count']-=1 time.sleep(3) json.dump(dic,open('db.txt','w',encoding='utf-8')) print('<%s> 购票成功' %name) else: print('<%s> 购票失败' %name) def task(name,): search(name) # mutex.acquire() get(name) # mutex.release() if __name__ == '__main__': # mutex=Lock() for i in range(10): p=Process(target=task,args=('路人%s' %i,)) p.start() p.join()#把进程整个代码都串行了,看票也是串行的 打印: <路人0> 查看到剩余票数【0】 <路人0> 购票失败 <路人1> 查看到剩余票数【0】 <路人1> 购票失败 <路人2> 查看到剩余票数【0】 <路人2> 购票失败 <路人3> 查看到剩余票数【0】 <路人3> 购票失败 <路人4> 查看到剩余票数【0】 <路人4> 购票失败 <路人5> 查看到剩余票数【0】 <路人5> 购票失败 <路人6> 查看到剩余票数【0】 <路人6> 购票失败 <路人7> 查看到剩余票数【0】 <路人7> 购票失败 <路人8> 查看到剩余票数【0】 <路人8> 购票失败 <路人9> 查看到剩余票数【0】 <路人9> 购票失败
总结
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1、效率低(共享数据基于文件,而文件是硬盘上的数据)
2、需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:
1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题。
这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
队列的使用
进程之间内存之间是互相隔离的,但可以共享(加锁)数据,读写效率低;我用一块共享内存,读写效率就高了,也解决了加锁的问题
IPC,进程之间的通信:队列和管道,用的都是内存的空间,共享内存;队列就是管道加锁实现的。
队列,先进先出
创建队列的类(底层就是以管道和锁定的方式实现):
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
maxsize是队列中允许最大项数,省略则无大小限制。
但需要明确:
1、队列内存放的是消息而非大数据
2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小
主要方法:
q.put方法用以插入数据到队列中。
q.get方法可以从队列读取并且删除一个元素。
使用:
from multiprocessing import Queue q=Queue(3) q.put('hello') q.put({'a':1}) q.put([3,3,3,]) print(q.full()) # q.put(4) #只能放三个数据,然后会把它给锁上,卡着不能运行了,阻塞了 print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #清空了 print(q.get()) 打印: #最后空数据,卡着了,取不到了 True hello {'a': 1} [3, 3, 3] True
生产者消费者模型
生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者和消费者模式?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。
from multiprocessing import Process,Queue import time def producer(q): for i in range(6): #模拟生产6个数据 res='包子%s' %i time.sleep(0.5) print('生产者生产了%s' %res) q.put(res) #生产完不是直接给消费者,丢到队列里边 def consumer(q): while True: #不知道接几次 res=q.get() #接结果 if res is None:break #消费者最后会收到那个None,这样就不会卡那了 time.sleep(1) print('消费者吃了%s' % res) if __name__ == '__main__': #容器 q=Queue() #生产者们,生产者可能有多个 p1=Process(target=producer,args=(q,)) #传参 p2=Process(target=producer,args=(q,)) p3=Process(target=producer,args=(q,)) #消费者们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() #主进程保证p1这个子进程运行完了 p2.join() p3.join() q.put(None) #None代表结束信号,有几个消费者来几个信号;;应该是在主进程里边确保所有的生产者都生产结束之后才发结束信号 q.put(None) #有几个消费者就应该要有几个信号, print('主') 打印: 生产者生产了包子0 生产者生产了包子0 生产者生产了包子0 生产者生产了包子1 生产者生产了包子1 生产者生产了包子1 消费者吃了包子0 生产者生产了包子2 生产者生产了包子2 消费者吃了包子0 生产者生产了包子2 生产者生产了包子3 生产者生产了包子3 生产者生产了包子3 消费者吃了包子0 生产者生产了包子4 消费者吃了包子1 生产者生产了包子4 生产者生产了包子4 生产者生产了包子5 生产者生产了包子5 生产者生产了包子5 主 消费者吃了包子1 消费者吃了包子1 消费者吃了包子2 消费者吃了包子2 消费者吃了包子2 消费者吃了包子3 消费者吃了包子3 消费者吃了包子3 消费者吃了包子4 消费者吃了包子4 消费者吃了包子4 消费者吃了包子5 消费者吃了包子5 消费者吃了包子5
用处:程序中有两类角色,一类是生产数据、另一类是处理数据,就可以引入生产者消费者模型来解决耦合的问题; 生产者<--->队列<--->消费者
好处:平衡生产者与消费者之间的速度差;程序解开耦合。
用process下每个实现,生产者、消费者、和那个q都要在一个计算机上,集中式的;带来的问题:稳定性(计算机断了,集中式都断掉了)、效率和性能(一台计算机总归是有极限的)。 把程序各个组件分配出去,分给各个机器,效率和稳定性都提升了。基于网络套接字,把消息发来取走;基于网络通信的消息队列典型代表是Rabbitmq。
JoinableQueue
在有多个生产者和多个消费者时,有几个消费者就需要发送几次结束信号:相当low,可以用joinableQueue来解决
from multiprocessing import Process,JoinableQueue import time def producer(q): for i in range(2): res='包子%s' %i time.sleep(0.5) print('生产者生产了%s' %res) q.put(res) q.join() #所有的生产者生产完了之后,队列都取没了就算结束了; 生产完了之后就在这等着,数据都取走之后就不等了 def consumer(q): while True: res=q.get() #卡在这里;东西取没了就算结束 if res is None:break time.sleep(1) print('消费者吃了%s' % res) q.task_done() #提供的接口,从队列里边取走的信号 消费者给生产者发个信号,告诉生产者有一个数据取走了;跟原来反着的,原来是生产者给消费者发信号 if __name__ == '__main__': #容器 q=JoinableQueue() #q.join()#等队列执行完,队列取没了就算完了 #生产者们 p1=Process(target=producer,args=(q,)) p2=Process(target=producer,args=(q,)) p3=Process(target=producer,args=(q,)) #消费者们 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True #把c1、c2设置成守护进程,这样就可以正常结束了 c2.daemon=True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() #保证生产者结束完了 p2.join() p3.join() #执行到这一步,可以保证 q被取空了,消费者已经都完全取走了。 print('主') #这一行运行完之后,消费者就没意义存在了,这样就用到了守护进程 打印 生产者生产了包子0 生产者生产了包子0 生产者生产了包子0 生产者生产了包子1 生产者生产了包子1 生产者生产了包子1 消费者吃了包子0 消费者吃了包子0 消费者吃了包子0 消费者吃了包子1 消费者吃了包子1 消费者吃了包子1 主
线程
进程就是把资源给隔离开
每启动一个进程都会有一个线程;进程只是资源单位,并不能真正执行,进程内开的那个线程才是真正的运行单位;一个进程内起多个线程,跨部门之间的线程是不共享数据的,隔着进程的,同一进程内线程间是共享资源的; 开个部门起进程的开销更大,申请空间。
开启线程的两种方式
# import time # import random # from threading import Thread # # def piao(name): # print('%s piaoing' %name) # time.sleep(random.randrange(1,5)) # print('%s piao end' %name) # # if __name__ == '__main__': # t1=Thread(target=piao,args=('egon',)) # t1.start() # print('主线程') #只要是开了个进程,只是开了个内存空间,其实它会自动创建个线程。上边一共有2个线程; import time import random from threading import Thread class MyThread(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s piaoing' %self.name) time.sleep(random.randrange(1,5)) print('%s piao end' %self.name) if __name__ == '__main__': t1=MyThread('egon') t1.start() print('主') 打印: egon piaoing 主 egon piao end
#1开进程的开销远大于开线程 import time from threading import Thread from multiprocessing import Process def piao(name): print("%s piaoing"%name) time.sleep(2) print("%s piao end"%name) if __name__ == "__main__": #p1 = Process(target=piao, args=('egon', )) #它要申请内存空间 #p1.start() t1 = Thread(target=piao, args=('egon', )) t1.start()#信号发出以后,线程立马就起来了 print('主线程')
#2同一个进程内多个线程共享该进程的地址空间 from threading import Thread from multiprocessing import Process n = 100 def task(): global n n = 0 if __name__ == "__main__": # p1 = Process(target=task, ) #它要申请内存空间 # p1.start() #开一个子进程,会copy主进程的内存空间 # p1.join()#确保它执行完了 打印出的是100 t1 = Thread(target=task, ) t1.start()#共享 t1.join() print('主线程', n) #子进程改了,不影响主进程,改的是它自己内存空间的,打印的是0
# 3、瞅一眼pid from threading import Thread from multiprocessing import Process,current_process import os def task(): # print(current_process().pid) #查看线程id,不能看父进程(用os)的 print('子进程PID:%s 父进程的PID:%s' %(os.getpid(),os.getppid())) if __name__ == '__main__': p1=Process(target=task,) p1.start() # print('主进程',current_process().pid) print('主进程',os.getpid()) 打印 主进程 3088 子进程PID:7728 父进程的PID:3088
from threading import Thread import os def task(): print('子线程:%s' %(os.getpid())) #一个进程内的线程大家的地位是一样的 if __name__ == '__main__': t1=Thread(target=task,) t1.start() print('主线程',os.getpid()) #这两个线程的同属于一个进程 打印 子线程:6220 主线程 6220
Thread对象的其他属性或方法
Thread实例对象的方法 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
from threading import Thread,currentThread,active_count,enumerate import time def task(): print('%s is ruuning' %currentThread().getName()) time.sleep(2) print('%s is done' %currentThread().getName()) if __name__ == '__main__': t=Thread(target=task,name='子线程1') t.start() # t.setName('儿子线程1') #设置名字 # t.join() # print(t.getName()) #t就是currentThread() # currentThread().setName('主线程') # print(t.isAlive()) #查看线程是否还活着,加了t.join就死掉了 # print('主线程',currentThread().getName()) 看下主线程用那个currentThread,查看当前线程名 # t.join() # print(active_count()) #活跃的线程数 只剩下主线程了,因为你join了 print(enumerate()) #把当前活跃的线程对象拿过来
守护线程
无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁
需要强调的是:运行完毕并非终止运行
1、对主进程来说,运行完毕指的是主进程代码运行完毕 2、对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕 1.1、主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束, 2.1、主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) # t.setDaemon(True) #必须在t.start()之前设置;两种守护线程设置方式,另外一种是下面 t.daemon=True t.start() #造线程,立马就造出来了,睡2s就足够打印下面"主线程'了 print('主线程') #2s把主线程都运行完了;主线程没有要等的了,然后就死掉了,守护进程跟着死,就不会打印上边那个say hello print(t.is_alive()) #打印这2s也足够它运行了 打印: 主线程 True
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) #t2非守护进程要等待3s,睡1s开到“end123”,再睡2s看到“end456” print("end456") if __name__ == '__main__': t1=Thread(target=foo) t2=Thread(target=bar) #一共3个线程;t1为守护线程,t2为非守护线程;主线程运行完就盯着非守护进程运行完,主线程才运行完 t1.daemon=True t1.start() t2.start() print("main-------") 打印: 123 456 main------- end123 end456
互斥锁
把并行变成串行;将同时运行的多个任务变成一个一个执行,牺牲了效率,保证了数据的安全;保护不同的数据就要加不同的锁;
局部串行,只针对共享数据的部分修改,让它们串行;
#mutex from threading import Thread,Lock import time n = 100 def task(): global n mutex.acquire() #1个线程起来先去抢一把锁,在它睡0.1s时,其他99个线程同时会去抢锁,等第一个执行完后抢得锁,这时候n=99,然后再来回循环 temp = n time.sleep(0.1) #睡0.1s就足够其他99个线程启动运行了;都停在这睡之前都拿到了n=100了 n = temp - 1 #这100个数据都改成了99,数据变得不安全了,得加把锁 mutex.release() if __name__ == '__main__': mutex = Lock() t_l = [] for i in range(100): t = Thread(target=task) #在1个进程里边开了100个线程,共享空间的 t_l.append(t) t.start() for t in t_l: t.join() print('主', n) 打印: #这时候降低了效率,保证了数据的安全,如果不加锁,结果是 主 99 主 0
GIL的基本概念
同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
本质就是把互斥锁;启动一个py文件,就是启动了py解释器的一个进程;运行py程序
对于cpython解释器:垃圾回收机制+定期开启销毁。
对于cpython解释器要想用多核优势,就要开多个进程。
在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势
GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。
要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程。例如python test.py,python aaa.py,python bbb.py会产生3个不同的python进程
############验证python test.py只会产生一个进程
#test.py内容 import os,time print(os.getpid()) time.sleep(1000) #打开终端执行 python3 test.py #在windows下查看 tasklist |findstr python #在linux下下查看 ps aux |grep python
一个python的进程内,不仅有test.py的主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,所有线程都运行在这一个进程内
1、所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的(test.py的所有代码以及Cpython解释器的所有代码) 例如:test.py定义一个函数work(代码内容如下图),在进程内所有线程都能访问到work的代码,于是我们可以开启三个线程然后target都指向该代码,能访问到意味着就是可以执行。
2、所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。
如果多个线程的target=work,那么执行流程是:
多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行。
GIL与自定义互斥锁
锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据;
然后,我们可以得出结论:保护不同的数据就应该加不同的锁。
GIL保护的是解释器级别跟垃圾回收机制有关的数据;
mutex保护的是自己的数据;
代码要想执行就是要给py解释器,用的C代码,解释器上加那个GIL锁;
分析
1、100个线程去抢GIL锁,即抢执行权限 2、肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire() 3、极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,*交出执行权限,即释放GIL 4、直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程
GIL与多线程
有了GIL的存在,同一时刻同一个进程内的多个线程,只能有一个出来执行;
进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,
1、cpu到底是用来做计算的,还是用来做I/O的? 2、多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能 3、每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处
1、对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用 2、当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地
假设我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:
方案一:开启四个进程
方案二:一个进程下,开启四个线程
单核情况下,分析结果:
如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜
多核情况下,分析结果:
如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜
结论:
现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
##计算密集型,应该用多进程 from multiprocessing import Process from threading import Thread import os,time def work(): res=0 for i in range(100000000): res*=i if __name__ == '__main__': l=[] print(os.cpu_count()) #本机为4核 start=time.time() for i in range(4): p=Process(target=work) #多进程 耗时5s多 牺牲开进程的开销用上了多核优势 #p=Thread(target=work) #多线程 耗时18s多 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start))
#I/O密集型用多线程 from multiprocessing import Process from threading import Thread import threading import os,time def work(): time.sleep(2) #print('===>') if __name__ == '__main__': l=[] print(os.cpu_count()) #本机为4核 start=time.time() for i in range(400): #p=Process(target=work) #耗时12s多,大部分时间耗费在创建进程上 p=Thread(target=work) #耗时2s多 ,消耗的就是来回切的时间 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start))
多线程用于IO密集型,如socket,爬虫,web 多进程用于计算密集型,如金融分析
死锁与递归锁
死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
# 死锁 from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 拿到了A锁' %self.name) mutexB.acquire() #线程1拿到了B锁,还没人跟它抢,最多其他线程都去抢A锁 print('%s 拿到了B锁' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 拿到了B锁' % self.name) time.sleep(0.1) mutexA.acquire() #线程1执行f2,拿到B锁后,又去拿A锁,但这个时候A锁在第二个进程里边拿着呢 print('%s 拿到了A锁' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start() 打印:卡那了 Thread-1 拿到了A锁 Thread-1 拿到了B锁 Thread-1 拿到了B锁 Thread-2 拿到了A锁
# 互斥锁只能acquire一次 # from threading import Thread,Lock # # mutexA=Lock() # # mutexA.acquire() # mutexA.release()
如何解决呢,用递归锁
递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次
# 递归锁:可以连续acquire多次,每acquire一次计数器+1,只有计数为0时,其他线程才能被抢到acquire from threading import Thread,RLock import time mutexB=mutexA=RLock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 拿到了A锁' %self.name) mutexB.acquire() #这个时候计数器为2,其他进程都不能跟它抢 print('%s 拿到了B锁' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 拿到了B锁' % self.name) time.sleep(7) mutexA.acquire() print('%s 拿到了A锁' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start() 打印: Thread-1 拿到了A锁 Thread-1 拿到了B锁 Thread-1 拿到了B锁 Thread-1 拿到了A锁 Thread-2 拿到了A锁 Thread-2 拿到了B锁 Thread-2 拿到了B锁 Thread-2 拿到了A锁 Thread-4 拿到了A锁 Thread-4 拿到了B锁 Thread-4 拿到了B锁 Thread-4 拿到了A锁 Thread-6 拿到了A锁 Thread-6 拿到了B锁 Thread-6 拿到了B锁 Thread-6 拿到了A锁 Thread-8 拿到了A锁 Thread-8 拿到了B锁 Thread-8 拿到了B锁 Thread-8 拿到了A锁 Thread-10 拿到了A锁 Thread-10 拿到了B锁 Thread-10 拿到了B锁 Thread-10 拿到了A锁 Thread-5 拿到了A锁 Thread-5 拿到了B锁 Thread-5 拿到了B锁 Thread-5 拿到了A锁 Thread-9 拿到了A锁 Thread-9 拿到了B锁 Thread-9 拿到了B锁 Thread-9 拿到了A锁 Thread-7 拿到了A锁 Thread-7 拿到了B锁 Thread-7 拿到了B锁 Thread-7 拿到了A锁 Thread-3 拿到了A锁 Thread-3 拿到了B锁 Thread-3 拿到了B锁 Thread-3 拿到了A锁
信号亮
信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小。
from threading import Thread, Semaphore, currentThread import time, random sm = Semaphore(3) #有3个人可以抢到 def task(): # sm.acquire() # print('%s in'%currentThread().getName()) # sm.release() #加锁也可以用一个上下文管理的方式如下 with sm: print('%s in'%currentThread().getName()) time.sleep(random.randint(1,3)) if __name__ == '__main__': for i in range(10): t = Thread(target = task) t.start()
打印
Thread-1 in
Thread-2 in
Thread-3 in
Thread-4 in
Thread-5 in
Thread-6 in
Thread-7 in
Thread-8 in
Thread-9 in
Thread-10 in
Event事件
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
from threading import Event event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
#应用场景 from threading import Thread, Event import time event = Event() ###event.wait() #一直在那等着;直到等到event.set()才结束了 def student(name): print('学生%s 正在听课 '%name) event.wait(2) #可以设置个超时时间,过了3s即使没有给我发set信号我也可以接着干其他的 print('学生%s 课间活动 '%name) def teacher(name): print('老师%s 正在授课 '%name) time.sleep(8) event.set() if __name__ == '__main__': stu1 = Thread(target=student, args=('kris', )) stu2 = Thread(target=student, args=('alex', )) stu3 = Thread(target=student, args=('alen', )) t1=Thread(target=teacher,args=('egon',)) stu1.start() stu2.start() stu3.start() t1.start()
打印:
学生kris 正在听课
学生alex 正在听课
学生alen 正在听课
老师egon 正在授课
学生alex 课间活动
学生alen 课间活动
学生kris 课间活动
from threading import Thread,Event,currentThread import time event=Event() def conn(): #尝试链接,检测是否链接成功;event事件,一个等,一个唤醒 n=0 while not event.is_set(): #循环的发请求 if n == 3: print('%s try too many times' %currentThread().getName()) return print('%s try %s' %(currentThread().getName(),n)) event.wait(0.5) #等5s时间尝试, n+=1 print('%s is connected' %currentThread().getName()) def check(): #检测服务端是否正常运行 print('%s is checking' %currentThread().getName()) time.sleep(5) event.set() if __name__ == '__main__': for i in range(3): t=Thread(target=conn) t.start() t=Thread(target=check) t.start() 打印: Thread-1 try 0 Thread-2 try 0 Thread-3 try 0 Thread-4 is checking Thread-2 try 1 Thread-3 try 1 Thread-1 try 1 Thread-1 try 2 Thread-2 try 2 Thread-3 try 2 Thread-2 try too many times Thread-3 try too many times Thread-1 try too many times
定时器
定时器,指定n秒后执行某操作
from threading import Timer def task(name): print('hello %s' %name) t=Timer(5,task,args=('egon',)) t.start() 打印: hello egon
from threading import Timer import random class Code: def __init__(self): self.make_cache() #最开始实例化的时候就先拿到一个验证码; def make_cache(self, interval=9): #做缓存功能 self.cache = self.make_code() print(self.cache) self.t = Timer(interval, self.make_cache ) self.t.start() def make_code(self, n=4): #一个随机字符串 res = '' for i in range(n): s1 = str(random.randint(0, 9))#转成str为了拼接字符串 s2 = chr(random.randint(65, 90)) #拿到字母,对应ascii表里边的 res += random.choice([s1, s2]) return res def check(self): while True: code = input('请输入你的验证码>>>:').strip() #你输不对我就隔5s刷新一下 if code.upper() == self.cache: print('验证码输入正确') self.t.cancel() break obj = Code() obj.check()
线程queue
进程里边的queue是多个进程之间共享数据,解决处理共享锁的问题;
###基本用法 import queue q=queue.Queue(3) #先进先出->队列 q.put('first') q.put(2) q.put('third') # q.put(4) # q.put(4,block=False) #等同于 q.put_nowait(4) #不等待 # q.put(4,block=True,timeout=3) #默认为block=True;等3s print(q.get()) print(q.get()) print(q.get()) # print(q.get(block=False)) #q.get_nowait() # print(q.get_nowait()) # print(q.get(block=True,timeout=3))
##跟上边用法一样 import queue q=queue.LifoQueue(3) #后进先出->堆栈 q.put('first') q.put(2) q.put('third') print(q.get()) print(q.get()) print(q.get()) 打印: third 2 first
import queue q=queue.PriorityQueue(3) #优先级队列 数字越小优先级越高 q.put((10,'one')) q.put((40,'two')) q.put((30,'three')) print(q.get()) print(q.get()) print(q.get()) 打印: (10, 'one') (30, 'three') (40, 'two')
多线程实现并发的套接字通信
把服务端原来串行的多个任务放到不同的线程里边去,彼此不互相影响;
#基于线程池实现 ##控制了最大并发数,保证了服务端不会无限制的去开启线程
##服务端 from socket import * from concurrent.futures import ThreadPoolExecutor def communicate(conn): while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port): server = socket(AF_INET, SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn, addr = server.accept() pool.submit(communicate,conn) server.close() if __name__ == '__main__': pool=ThreadPoolExecutor(2) #最多开启2个线程,那么客户端也就最多开启2个了 server('127.0.0.1', 8081)
#服务端 from socket import * from threading import Thread def communicate(conn): #通信 while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port): #负责建链接 server = socket(AF_INET, SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn, addr = server.accept() #建链接,建成功一次;立马起个线程;建好链接就给你干通信的活 t=Thread(target=communicate,args=(conn,)) #就像专门聘请个服务员来服务你 t.start() server.close() if __name__ == '__main__': server('127.0.0.1', 8081) #主线程,相当于招待人员
#客户端 from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8081)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close()
问题是机器不能无限制的起线程;
进程池线程池
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor #计算密集型多核优势的情况下应该用进程;I/O密集型的应该用线程 import os,time,random def task(name): print('name:%s pid:%s run' %(name,os.getpid())) time.sleep(random.randint(1,3)) if __name__ == '__main__': pool=ProcessPoolExecutor(4) #先把池子造好,指定最多放4个进程;不指定就是cpu的核字 #pool=ThreadPoolExecutor(5) #在一个进程里边PID是一样的 for i in range(10): pool.submit(task,'egon%s' %i) #把这10个任务全部丢给池子,没有阻塞 #异步调用,提交完任务不用等着任务执行拿到结果,只负责提交完了立马走 pool.shutdown(wait=True) #执行join操作;默认值就是True #等任务提交结束,把入口关了 print('主')
##线程池 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread import os,time,random def task(): print('name:%s pid:%s run' %(currentThread().getName(),os.getpid())) time.sleep(random.randint(1,3)) if __name__ == '__main__': pool=ThreadPoolExecutor(5) for i in range(10): pool.submit(task,) pool.shutdown(wait=True) print('主')
异步调用和回调机制
可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数
#1、同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行 from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print('%s is laing' %name) time.sleep(random.randint(3,5)) res=random.randint(7,13)*'#' return {'name':name,'res':res} def weigh(shit): name=shit['name'] size=len(shit['res']) print('%s 拉了 《%s》kg' %(name,size)) if __name__ == '__main__': pool=ThreadPoolExecutor(13) shit1=pool.submit(la,'alex').result() #往池子里提交任务,拿到结果 weigh(shit1) #称重 shit2=pool.submit(la,'wupeiqi').result() weigh(shit2) shit3=pool.submit(la,'yuanhao').result() weigh(shit3) 打印: alex is laing alex 拉了 《13》kg wupeiqi is laing wupeiqi 拉了 《13》kg yuanhao is laing yuanhao 拉了 《10》kg
#2、异步调用:提交完任务后,不地等待任务执行完毕, from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print('%s is laing' %name) time.sleep(random.randint(3,5)) res=random.randint(7,13)*'#' return {'name':name,'res':res} def weigh(shit): shit=shit.result() name=shit['name'] size=len(shit['res']) print('%s 拉了 《%s》kg' %(name,size)) if __name__ == '__main__': pool=ThreadPoolExecutor(13) pool.submit(la,'alex').add_done_callback(weigh) #绑定一个回调函数 pool.submit(la,'wupeiqi').add_done_callback(weigh) pool.submit(la,'yuanhao').add_done_callback(weigh)
打印:
alex is laing
wupeiqi is laing
yuanhao is laing
alex 拉了 《11》kg
wupeiqi 拉了 《7》kg
yuanhao 拉了 《8》kg
加上回调机制可以实现结构耦合。
阻塞是进程运行的一种状态,碰到I/O了进程就会阻塞,剥夺cpu的执行权限;
同步就是阻塞??遇到阻塞了程序就要在原地等着,同步调用也是提交完任务在原地等着啊,同步调用它只是一种提交任务的方式,比如纯计算型的也要等啊它是没有I/O的,它提交完任务之后根本不会考虑是计算型的还是I/O型的。
小练习
浏览器本质就是套接字客户端;在程序中模拟浏览器用request模块
# import requests # response = requests.get('http://www.cnblogs.com/linhaifeng') #去目标站点下载个文件 # print(response.text) from concurrent.futures import ThreadPoolExecutor import requests import time def get(url): print('GET %s' %url) response=requests.get(url) time.sleep(3) return {'url':url,'content':response.text} def parse(res): #并发下载,谁下载好了谁就去触发解析功能 res=res.result() print('%s parse res is %s' %(res['url'],len(res['content']))) if __name__ == '__main__': urls=[ 'http://www.cnblogs.com/linhaifeng', 'https://www.python.org', 'https://www.openstack.org', ] pool=ThreadPoolExecutor(2) #线程池,I/O密集型 for url in urls: pool.submit(get,url).add_done_callback(parse) #异步提交;回调函数 打印: GET http://www.cnblogs.com/linhaifeng GET https://www.python.org http://www.cnblogs.com/linhaifeng parse res is 16320 GET https://www.openstack.org https://www.python.org parse res is 49273 https://www.openstack.org parse res is 64050
协程
5个任务实现并发,放到1个线程里边;单线程是无法实现并行的;并发是看起来任务是同时运行的就可以了,其本质来回切换并保存状态。
单线程实现并发,切换+保存状态,协程要做的事情。
cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它。
其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。为此我们可以基于yield来验证。
yield本身就是一种在单线程下可以保存任务运行状态的方法;
1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级 2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换
第一种情况的切换。在任务一遇到io情况下,切到任务二去执行,这样就可以利用任务一阻塞的时间完成任务二的计算,效率的提升就在于此。
yield并不能实现遇到io切换

ps:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态
对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,
这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,
让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。
协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:
1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换
协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会*交出cpu执行权限,切换其他线程运行) 2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
优点如下:
1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点如下:
1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
总结协程特点:
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
#并发执行 import time def producer(): g=consumer() next(g) for i in range(10000000): g.send(i) def consumer(): while True: res=yield start_time=time.time() producer() stop_time=time.time() print(stop_time-start_time) #计算的时间+切换的时间 效率低 #串行 import time def producer(): res=[] for i in range(10000000): res.append(i) return res def consumer(res): pass start_time=time.time() res=producer() consumer(res) stop_time=time.time() print(stop_time-start_time) #没有切换的时间了 打印: 1.8201038837432861 1.897108793258667
greenlet模块
只是比yield好一点
如果我们在单个线程内有20个任务,要想实现在多个任务之间切换,使用yield生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send。。。非常麻烦),而使用greenlet模块可以非常简单地实现这20个任务直接的切换。
from greenlet import greenlet import time def eat(name): print('%s eat 1' %name) time.sleep(10) g2.switch('egon') #第一次启动 print('%s eat 2' %name) g2.switch() #再切 def play(name): print('%s play 1' %name ) g1.switch() #再切回来 print('%s play 2' %name ) g1=greenlet(eat) g2=greenlet(play) g1.switch('egon') #第一次启动的时候传个参数 打印: egon eat 1 egon play 1 egon eat 2 egon play 2
greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。
gevent模块
本质就是封装了greenlet模块,它能检测I/O并且遇到I/O自动切换到另外一个任务执行;可以帮我们提升效率
from gevent import monkey;monkey.patch_all() #把下面所有的涉及I/O操作的给你打了个标记,被gevent识别 import gevent import time def eat(name): print('%s eat 1' % name) time.sleep(3) #遇到I/O立马切换到下面执行 print('%s eat 2' % name) def play(name): print('%s play 1' % name) time.sleep(4) print('%s play 2' % name) start_time=time.time() g1=gevent.spawn(eat,'egon') #异步提交的方式 g2=gevent.spawn(play,'alex') g1.join() #等待执行完 g2.join() stop_time=time.time() print(stop_time-start_time) 打印: egon eat 1 alex play 1 egon eat 2 alex play 2 4.0012288093566895
gevent异步提交任务
from gevent import monkey;monkey.patch_all() import gevent import time def eat(name): print('%s eat 1' % name) time.sleep(3) print('%s eat 2' % name) def play(name): print('%s play 1' % name) time.sleep(4) print('%s play 2' % name) g1=gevent.spawn(eat,'egon') g2=gevent.spawn(play,'alex') # time.sleep(5) # g1.join() # g2.join() gevent.joinall([g1,g2]) #相当于上边两行代码 打印: egon eat 1 alex play 1 egon eat 2 alex play 2
基于gevent模块实现并发的套接字通信
单线程、多任务的I/O操作。
#基于gevent实现 from gevent import monkey,spawn;monkey.patch_all() from socket import * def communicate(conn): while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port): server = socket(AF_INET, SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn, addr = server.accept() spawn(communicate,conn) #造一个协程对象,提交完这个对象它不会执行 server.close() if __name__ == '__main__': g=spawn(server,'127.0.0.1',8090) g.join()
##客户端 from socket import * from threading import Thread,currentThread def client(): client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8090)) while True: client.send(('%s hello' %currentThread().getName()).encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close() if __name__ == '__main__': for i in range(500): t=Thread(target=client) t.start()
I/O模型
协程是单线程下的并发,并不是对性能都有所提升,一定是检测单个线程下的多个任务的I/O,遇到I/O不要让它阻塞,给它自动切换到其他任务去;这样就能提高单个线程下的运行效率。用gevent模块来实现了,gevent是怎么检测I/O行为的呢,遇到I/O自动切换到其他任务去。
同步异步,同步调用是提交完任务在原地等着结果拿到结果后再运行下行代码;异步调用是提交完就不管了接着往下执行,异步通常跟回调机制连用,我提交完一个任务,这个任务运行完后会自动触发回调函数运行把结果交给它。
同步不定于阻塞。
阻塞I/O
#阻塞I/O 没有并发、遇到阻塞就等着 from socket import * from threading import Thread #可利用多线程实现并发 def communicate(conn): while True: try: data = conn.recv(1024) #等待 if not data: break conn.send(data.upper()) except ConnectionResetError: break conn.close() server = socket(AF_INET, SOCK_STREAM) server.bind(('127.0.0.1',8080)) server.listen(5) while True: print('starting...') conn, addr = server.accept() #wait阶段主要卡在这里 print(addr) t=Thread(target=communicate,args=(conn,)) #让主线程干accept的活,每来一个链接发起个线程让它干通信的活;没有解决I/O阻塞,各个运行互不影响 t.start() #线程池保证机器在健康的状态下运行 server.close()
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close()
非阻塞I/O
监测单线程下的I/O,帮你自动切换到另外一个任务
wait和copy阶段是怎么处理的呢
跟gevent一样。
from socket import * server = socket(AF_INET, SOCK_STREAM) server.bind(('127.0.0.1',8083)) server.listen(5) server.setblocking(False) #True是阻塞,False是非阻塞 print('starting...') rlist=[] wlist=[] #send在数据量大的情况下也会阻塞 while True: try: conn, addr = server.accept() #问操作系统有没有数据 ;服务端可以不停的建链接 rlist.append(conn) print(rlist) except BlockingIOError: #捕捉这个异常 #print('干其他的活') #没有数据来就可以干其他的活了, #收消息 del_rlist = [] for conn in rlist: try: data=conn.recv(1024) if not data: del_rlist.append(conn) continue wlist.append((conn,data.upper())) #存放套接字以及准备发的数据 except BlockingIOError: continue except Exception: conn.close() del_rlist.append(conn) #发消息 del_wlist=[] for item in wlist: try: conn=item[0] data=item[1] conn.send(data) del_wlist.append(item) except BlockingIOError: # pass for item in del_wlist:#发成功了把你删了 wlist.remove(item) for conn in del_rlist: #把没有数据发来的conn给删除掉 rlist.remove(conn) server.close()
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8083)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close()
多路复用I/O模型
from socket import * import select server = socket(AF_INET, SOCK_STREAM) server.bind(('127.0.0.1',8083)) server.listen(5) server.setblocking(False) print('starting...') rlist=[server,] #专门存收消息的 wlist=[] #存发的数据 wdata={} while True: rl,wl,xl=select.select(rlist,wlist,[],0.5) #问操作系统;[]表示出异常的 print('rl',rl) #rlist会不断存conn和server一堆;wlist存一堆conn,缓冲区一旦没满证明就可以发了 print('wl',wl) #wl是可以往缓冲区send值的这种套接字 for sock in rl: if sock == server: #干链接的活 conn,addr=sock.accept() rlist.append(conn) else: try: data=sock.recv(1024) if not data: #针对linux系统 sock.close() rlist.remove(sock) #这个套接字不要监测了 continue wlist.append(sock) wdata[sock]=data.upper() except Exception: sock.close() rlist.remove(sock) ## for sock in wl: data=wdata[sock] sock.send(data) wlist.remove(sock) wdata.pop(sock) server.close()
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8083)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close()
异步I/O模型