一、守护进程和守护线程
1)守护进程的概念
什么是守护进程:
守护: 在主进程代码结束情况下,就立即死掉
守护进程本质就是一个子进程,该子进程守护着主进程
为何要用守护进程
守护进程本质就是一个子进程,所以在主进程需要将任务并发执行的时候需要开启子进程
当该子进程执行的任务生命周期伴随主进程整个生命周期的时候,就需要将该子进程做成守护的进程
2)创建守护进程
from multiprocessing import Process import time def task(x): print('%s is running' %x) time.sleep(1) print('%s is done' % x) if __name__ == '__main__': p=Process(target=task,args=('守护进程',)) p.daemon=True # 必须放到p.start()之前 p.start() time.sleep(3) print('主')
3)守护线程的概念
主线程要等到该进程内所有非守护线程(子线程)都死掉才算死掉,因为主线程的生命周期
代表了该进程的生命周期,该进程一定是要等到所有非守护的线程都干完活才应该死掉
可以简单理解为:
守护线程是要等待该进程内所有非守护的线程都运行完毕才死掉
4)创建守护线程
from threading import Thread import time def task(x): print('%s is running' %x) time.sleep(3) print('%s is done' % x) if __name__ == '__main__': t=Thread(target=task,args=('守护线程',)) t.daemon=True # 必须放到p.start()之前 t.start() print('主')
二、互斥锁和信号量与GIL全局解释器锁,死锁及递归锁
1)互斥锁的意义
互斥锁的原理是将进程/线程内执行的部分代码由并发执行变成穿行执行,牺牲了效率但保证数据安全
互斥锁不能连续低执行mutex.acquire()操作,必须等到拿着锁的进程释放锁mutex.release()其他进程才能抢到
2)进程mutex=Lock()
from multiprocessing import Process,Lock import json import os import time import random mutex=Lock() def check(): with open('db.json','rt',encoding='utf-8') as f: dic=json.load(f) print('%s 剩余票数:%s' %(os.getpid(),dic['count'])) def get(): with open('db.json','rt',encoding='utf-8') as f: dic=json.load(f) time.sleep(1) if dic['count'] > 0: dic['count']-=1 time.sleep(random.randint(1,3)) #模拟网络延迟 with open('db.json','wt',encoding='utf-8') as f: json.dump(dic,f) print('%s 抢票成功' %os.getpid()) def task(mutex): # 并发查看 check() # 串行购票 mutex.acquire() get() mutex.release() if __name__ == '__main__': for i in range(7): p=Process(target=task,args=(mutex,)) p.start() # p.join() # 将p内的代码变成整体串行
3)信号量。设置能同时执行任务的数量
# 比如公共厕所,能同时上厕所的有4个位置 from multiprocessing import Process,Semaphore import os import time import random sm=Semaphore(4) def go_wc(sm): sm.acquire() print('%s is wcing' %os.getpid()) time.sleep(random.randint(1,3)) sm.release() if __name__ == '__main__': for i in range(20): p=Process(target=go_wc,args=(sm,)) p.start()
4)线程问题版。线程中修改同一个数据,数据存在不安全,故障
from threading import Thread import time n = 100 def task(): global n temp = n time.sleep(0.1) n = temp -1 if __name__ == '__main__': t_l = [] for i in range(100): t = Thread(target=task) t_l.append(t) t.start() for t in t_l: t.join() print(n)
5)线程互斥锁修改版
from threading import Thread,Lock import time mutex = Lock() n = 100 def task(): global n with mutex: # 拿到锁,自动释放锁 temp = n time.sleep(0.1) n = temp -1 if __name__ == '__main__': t_l = [] start_time = time.time() for i in range(50): t = Thread(target=task) t_l.append(t) t.start() for t in t_l: t.join() print(n) print(time.time()- start_time)
6)GIL的意义。判断什么情况下使用线程和进程
1 GIL是什么 GIL是全局解释器锁,本质就是一把互斥锁 GIL是Cpython解释器的特性,而不是python的特性 每启动一个进程,该进程就会有一个GIL锁,用来控制该进程内的多个线程同一时间只有一个执行 这意味着Cpython解释器的多线程没有并行的效果,但是有并发的效果 2、为什么要有GIL 因为Cpython解释器的垃圾回收机制不是线程安全的 3、GIL vs 自定义互斥锁 在一个进程内的多个线程要想执行,首先需要抢的是GIL,GIL就相当于执行权限 python的多进程用于计算密集型 python的多线程用于IO密集型
7)计算密集型中,进程计算和线程计算对比
进程计算,计算密集型。利用多核cpu的优势,但进程数不能超过核数的2倍,会大量消耗cpu资源。计算时间 27.400567293167114
from multiprocessing import Process import time def task1(): res=1 for i in range(100000000): res*=i def task2(): res = 1 for i in range(100000000): res += i def task3(): res = 1 for i in range(100000000): res -= i def task4(): res = 1 for i in range(100000000): res += i if __name__ == '__main__': start_time=time.time() p1=Process(target=task1) p2=Process(target=task2) p3=Process(target=task3) p4=Process(target=task4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() stop_time=time.time() print(stop_time-start_time) #27.400567293167114
线程进行密集计算。计算时间 86.84396719932556
import time from threading import Thread def task1(): res=1 for i in range(100000000): res*=i def task2(): res = 1 for i in range(100000000): res += i def task3(): res = 1 for i in range(100000000): res -= i def task4(): res = 1 for i in range(100000000): res += i if __name__ == '__main__': start_time=time.time() p1=Thread(target=task1) p2=Thread(target=task2) p3=Thread(target=task3) p4=Thread(target=task4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() stop_time=time.time() print(stop_time-start_time) # 86.84396719932556
8)IO密集型中。进程与线程对比
进程完成时间 3.5172011852264404
import time from multiprocessing import Process def task1(): time.sleep(3) def task2(): time.sleep(3) def task3(): time.sleep(3) def task4(): time.sleep(3) if __name__ == '__main__': start_time=time.time() p1=Process(target=task1) p2=Process(target=task2) p3=Process(target=task3) p4=Process(target=task4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() stop_time=time.time() print(stop_time-start_time) # 3.5172011852264404
线程优势,完成时间 3.003171443939209
import time from threading import Thread def task1(): time.sleep(3) def task2(): time.sleep(3) def task3(): time.sleep(3) def task4(): time.sleep(3) if __name__ == '__main__': start_time=time.time() p1=Thread(target=task1) p2=Thread(target=task2) p3=Thread(target=task3) p4=Thread(target=task4) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() stop_time=time.time() print(stop_time-start_time) # 3.003171443939209
9)死锁现象。释放锁之前,都需要获取到对方的锁,造成了无法释放锁
from threading import Thread,Lock import time mutexA = Lock() mutexB = Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 抢到了A锁' %self.name) mutexB.acquire() print('%s 抢到了B锁' % self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 抢到了B锁' % self.name) time.sleep(1) mutexA.acquire() print('%s 抢到了A锁' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(2): t = Mythread() t.start()
10)递归锁。RLock,解决死锁现象。递归锁可以连续acquire()
from threading import Thread,RLock import time mutexA = mutexB = RLock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 抢到了A锁' %self.name) mutexB.acquire() print('%s 抢到了B锁' % self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 抢到了B锁' % self.name) time.sleep(1) mutexA.acquire() print('%s 抢到了A锁' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(2): t = Mythread() t.start()
三、IPC机制或队列和生产者模型
IPC:进程间通信,有两种解决方案:队列、管道
1)队列,先进先出。应用于生产者模型
from multiprocessing import Queue q=Queue(maxsize=3) q.put({'x':1}) q.put(2) q.put('third') print(q.get()) print(q.get()) print(q.get())
默认不加参数,超过队列最大值会堵塞。 q.put(1,block=False) 超过最大值,程序中断。效果等同于 q.put_nowait(1)。
timeout=3 超时时间,block=True的时候,才有意义
2)生产者模型的意义
1、什么是生产者消费者模型 生产者消费者模型指的是一种解决问题的思路 该模型中包含两类明确的角色: 1、生产者:创造数据的任务 2、消费者:处理数据的任务 2、为什么要用生产者消费者模型? 1、实现生产者与消费者任务的解耦和 2、平衡了生产者的生产力与消费者消费力 一旦程序中出现明显的两类需要并发执行的任务,一类是负责数据的,另外一类是负责处理数据的 那么就可以使用生产者消费者模型来提升执行效率 3、如何用 生产者----》队列《-------消费者 队列 1、队列占用的是内存控制,即便是不指定队列的大小也不可能无限制地放数据 2、队列是用来传递消息的介质,即队列内存放的是数据量较小的数据
2)生产者模型Queue,消费者卡住的不完善版本
from multiprocessing import Queue,Process import time def producer(name,q): for i in range(5): res = '包子%s' %i time.sleep(0.5) print('\033[45m厨师%s 生成了%s\033[0m' %(name,res)) q.put(res) def consumer(name,q): while True: res = q.get() time.sleep(1) print('\033[47m吃货%s 吃了%s\033[0m'%(name,res)) if __name__ == '__main__': q = Queue() # 生产者们 p1 = Process(target=producer,args=('egon',q)) # 消费者们 c1 = Process(target=consumer,args=('alex',q)) p1.start() c1.start() print("主")
3)low版生产者模型Queue,结束信号None
from multiprocessing import Queue,Process import time def producer(name,food,q): for i in range(5): res = '%s%s' %(food,i) time.sleep(0.5) print('\033[45m厨师%s 生成了%s\033[0m' %(name,res)) q.put(res) def consumer(name,q): while True: res = q.get() time.sleep(1) print('\033[47m吃货%s 吃了%s\033[0m'%(name,res)) if __name__ == '__main__': q = Queue() # 生产者们 p1 = Process(target=producer, args=('egon','蛋糕',q)) p2 = Process(target=producer, args=('lxx','面包' ,q)) p3 = Process(target=producer, args=('cxx','炸弹' ,q)) # 消费者们 c1 = Process(target=consumer,args=('alex',q)) c2 = Process(target=consumer, args=('wcc', q)) p1.start() pfile:/D:/oldboyedu/manth-03/day-03/tet2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) q.put(None) print('主') print("主")
4)生产者模型最终版本 JoinableQueue,以守护进程的方式来结束
from multiprocessing import JoinableQueue,Process import time def producer(name,food,q): for i in range(5): res = '%s%s' %(food,i) time.sleep(0.5) print('\033[45m厨师%s 生成了%s\033[0m' %(name,res)) q.put(res) def consumer(name,q): while True: res = q.get() time.sleep(1) print('\033[47m吃货%s 吃了%s\033[0m'%(name,res)) q.task_done() # 消费者拿了一个,队列就少了一个 if __name__ == '__main__': q = JoinableQueue() # 生产者们 p1 = Process(target=producer, args=('egon','蛋糕',q)) p2 = Process(target=producer, args=('lxx','面包' ,q)) p3 = Process(target=producer, args=('cxx','炸弹' ,q)) # 消费者们 c1 = Process(target=consumer,args=('alex',q)) c2 = Process(target=consumer, args=('wcc', q)) c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # 此时 3个生产者都已经生产完了 q.join() # 1、证明生产者都已经完全生产完毕 2、队列为空,也就是消费者也消费完毕 print('主')
四、线程queue
1)队列:先进先出
import queue q=queue.Queue(3) q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get())
2)堆栈:先进后出
import queue q=queue.LifoQueue(3) q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get())
3)优先级队列:优先级高的优先出来
import queue q=queue.PriorityQueue(3) q.put((13,'lxx')) q.put((10,'egon')) #数字代表优先级,数字越小优先级越高 q.put((11,'alex')) print(q.get()) print(q.get()) print(q.get())
五、进程池与线程池
1)池的概念
1、什么进程池、线程池 池指的一个容器,该容器用来存放进程或线程,存放的数目是一定的 2、为什么要用池 用池是为了将并发的进程或线程数目控制在计算机可承受的范围内 为何要用进程进池? 当任务是计算密集型的情况下应该用进程来利用多核优势 为何要用线程进池? 当任务是IO密集型的情况下应该用线程减少开销
2)同步与异步
同步调用 vs 异步调用 异步调用与同步调用指的是提交任务的两种方式 同步调用:提交完任务后,就在原地等待任务执行完毕,拿到运行结果/返回值后再执行下一行代码 同步调用下任务的执行是串行执行 异步调用:提交完任务后,不会原地等待任务执行完毕,结果 futrue = p.submit(task,i),结果记录在内存中, 直接执行下一行代码 同步调用下任务的执行是并发执行
3)异步进程池
from concurrent.futures import ProcessPoolExecutor import os import time import random def task(x): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return x**2 if __name__ == '__main__': p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数 futrues = [] # 保存任务返回值 for i in range(10): futrue = p.submit(task,i) # 提交任务,异步提交 futrues.append(futrue) # 保存任务返回值 p.shutdown(wait=True) # 关闭了继续提入口交任务的,wait=True 把进程池的里的事做完,再执行后面的任务 for futrue in futrues: print(futrue.result()) # 输入返回结果值 print('主')
4)同步进程池
from concurrent.futures import ProcessPoolExecutor import os import time import random def task(x): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return x**2 if __name__ == '__main__': p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数 for i in range(10): res = p.submit(task,i).result() # 提交任务,异步提交 print(res) print('主')
小结,同步与异步,对于获取任务返回值的方式,在于什么时候 obj.result()。
6)回调函数,进程池,解析任务返回值。add_done_callback(parse)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os import time import requests def get(url): print('%s GET %s' %(os.getpid(),url)) time.sleep(2) response=requests.get(url) if response.status_code == 200: res=response.text return res def parse(obj): res = obj.result() print('%s 解析[url]结果是 %s' % (os.getpid(), len(res))) if __name__ == '__main__': p=ProcessPoolExecutor(3) urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.taobao.com', 'https://www.jd.com', ] for url in urls: p.submit(get,url).add_done_callback(parse) # 回调函数会在任务运行完毕后自动触发,并且接收该任务对象 print('主',os.getpid())
7)回调函数,线程池,解析任务返回值。add_done_callback(parse)
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time import random def task(x): print('%s is running' %current_thread().getName()) time.sleep(random.randint(1,3)) return x**2 def parse(obj): res=obj.result() print('%s 解析的结果为%s' %(current_thread().getName(),res)) if __name__ == '__main__': t=ThreadPoolExecutor(3) for i in range(10): t.submit(task,i).add_done_callback(parse)
六、补充知识
1)线程event事件。一个线程发了一个信号,另外个线程收到该信号,才能继续执行
from threading import Event,current_thread,Thread import time event=Event() # 生成信号事件 def check(): print('%s 正在检测服务是否正常....' %current_thread().name) time.sleep(3) event.set() # 发送信号 def connect(): print('%s 等待连接...' %current_thread().name) event.wait() # 接收信号 print('%s 开始连接...' % current_thread().name) if __name__ == '__main__': t1=Thread(target=connect) t2=Thread(target=connect) t3=Thread(target=connect) c1=Thread(target=check) t1.start() t2.start() t3.start() c1.start()
2)基于上面内容,设置尝试次数
from threading import Event,current_thread,Thread import time event=Event() def check(): print('%s 正在检测服务是否正常....' %current_thread().name) time.sleep(2) event.set() def connect(): count=1 while not event.is_set(): if count == 4: print('尝试的次数过多,请稍后重试') return print('%s 尝试第%s次连接...' %(current_thread().name,count)) event.wait(1) count+=1 print('%s 开始连接...' % current_thread().name) if __name__ == '__main__': t1=Thread(target=connect) t2=Thread(target=connect) t3=Thread(target=connect) c1=Thread(target=check) t1.start() t2.start() t3.start() c1.start()
七)协程介绍(单线程下并发)
单线程下实现并发:协程 并发指的多个任务看起来是同时运行的 并发实现的本质:切换+保存状态 并发、并行、串行: 并发:看起来是同时运行,切换+保存状态 并行:真正意义上的同时运行,只有在多cpu的情况下才能 实现并行,4个cpu能够并行4个任务 串行:一个人完完整整地执行完毕才运行下一个任务 实现方法: 基于yield保存状态,实现两个任务直接来回切换,即并发的效果 PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
1)gevent模拟单线程并发(协程),from gevent import monkey;monkey.patch_all(),监控IO,实现单线程的并发操作
from gevent import monkey;monkey.patch_all() import gevent import time def eat(name): print('%s eat 1' %name) time.sleep(3) print('%s eat 2' % name) def play(name): print('%s play 1' %name) time.sleep(5) print('%s play 2' % name) g1 = gevent.spawn(eat,'egon') g2 = gevent.spawn(play,'alex') gevent.joinall([g1,g2])
2)DummyThread(单线程的并发:spawn,实现的是假线程)
from gevent import monkey;monkey.patch_all() from threading import current_thread import gevent import time def eat(): print('%s eat 1' %current_thread().name) time.sleep(3) print('%s eat 2' %current_thread().name) def play(): print('%s play 1' %current_thread().name) time.sleep(5) print('%s play 2' % current_thread().name) g1 = gevent.spawn(eat) g2 = gevent.spawn(play) print(current_thread().name) gevent.joinall([g1,g2])
3)socket连接,单线程下的并发,测试连接抗压能力
from gevent import monkey,spawn;monkey.patch_all() from threading import Thread from socket import * def talk(conn): while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port,backlog=5): s = socket() s.bind((ip,port)) s.listen(backlog) while True: conn, addr = s.accept() print(addr) # 通信 g=spawn(talk,conn) s.close() if __name__ == '__main__': spawn(server,'127.0.0.1',8080).join() # server(('127.0.0.1',8080))
from threading import Thread,current_thread from socket import * import os def client(): client = socket() client.connect(('127.0.0.1', 8080)) while True: data = '%s hello' % current_thread().name client.send(data.encode('utf-8')) res = client.recv(1024) print(res.decode('utf-8')) if __name__ == '__main__': for i in range(200): t=Thread(target=client) t.start()
4)网络IO非堵塞模型,实现单线程的并发。s.setblocking(False),与from gevent import monkey;monkey.patch_all()的原理一样
from socket import * s = socket() s.bind(('127.0.0.1',8080)) s.listen(5) s.setblocking(False) r_list=[] while True: try: conn, addr = s.accept() r_list.append(conn) except BlockingIOError: print('可以去干其他的活了') print('rlist: ',len(r_list)) for conn in r_list: try: data=conn.recv(1024) conn.send(data.upper()) except BlockingIOError: continue
from socket import * import os client = socket() client.connect(('127.0.0.1', 8080)) while True: data='%s say hello' %os.getpid() client.send(data.encode('utf-8')) res=client.recv(1024) print(res.decode('utf-8'))
5)网络IO非堵塞模型,修正版,收消息与发消息区分开
from socket import * s = socket() s.bind(('127.0.0.1',8080)) s.listen(5) s.setblocking(False) r_list=[] w_list=[] while True: try: conn, addr = s.accept() r_list.append(conn) except BlockingIOError: print('可以去干其他的活了') print('rlist: ',len(r_list)) # 收消息 del_rlist=[] for conn in r_list: try: data=conn.recv(1024) if not data: conn.close() del_rlist.append(conn) continue w_list.append((conn,data.upper())) except BlockingIOError: continue except ConnectionResetError: conn.close() del_rlist.append(conn) # 发消息 del_wlist=[] for item in w_list: try: conn=item[0] res=item[1] conn.send(res) del_wlist.append(item) except BlockingIOError: continue except ConnectionResetError: conn.close() del_wlist.append(item) # 回收无用连接 for conn in del_rlist: r_list.remove(conn) for item in del_wlist: w_list.remove(item)
from socket import * import os client = socket() client.connect(('127.0.0.1', 8080)) while True: data='%s say hello' %os.getpid() client.send(data.encode('utf-8')) res=client.recv(1024) print(res.decode('utf-8'))
6)IO多路复用。select模块优化上面的内容
from socket import * import select s = socket() s.bind(('127.0.0.1',8080)) s.listen(5) s.setblocking(False) # print(s) r_list=[s,] w_list=[] w_data={} while True: print('被检测r_list: ',len(r_list)) print('被检测w_list: ',len(w_list)) rl,wl,xl=select.select(r_list,w_list,[],) #r_list=[server,conn] # print('rl: ',len(rl)) #rl=[conn,] # print('wl: ',len(wl)) # 收消息 for r in rl: #r=conn if r == s: conn,addr=r.accept() r_list.append(conn) else: try: data=r.recv(1024) if not data: r.close() r_list.remove(r) continue # r.send(data.upper()) w_list.append(r) w_data[r]=data.upper() except ConnectionResetError: r.close() r_list.remove(r) continue # 发消息 for w in wl: w.send(w_data[w]) w_list.remove(w) w_data.pop(w)
from socket import * import os client = socket() client.connect(('127.0.0.1', 8080)) while True: data='%s say hello' %os.getpid() client.send(data.encode('utf-8')) res=client.recv(1024) print(res.decode('utf-8'))
八、目前知识总结,项目篇