并发编程(六)--进程/线程池、协程、gevent第三方库

时间:2024-11-26 11:34:37

一、进程/线程池

1、进程池

(1)什么是进程池

如果需要创建的子进程数量不大,可以直接利用multiprocess中的Process来创建。但是当需要创建上百个或上千个,手动创建就较为繁琐,这时就可以利用进程池来创建,即current.futures模块中的ProcessPoolExecutor

(2)ProcessPoolExecutor的基本方法

1. submit(fn,*args,**kwargs)        # 异步提交任务

2. map(func, *iterables, timeout=None, chunksize=1)      #取代for循环submit的操作

3. shutdown(wait=True)      # 相当于pool.close()+pool.join()
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前 4. result() # 获取结果 5. add_done_callback(fn) # 回调函数,在任务运行完毕后自动触发,传给fn函数一个future,并执行fn函数

(3)进程池的使用

  • 利用列表存放future,从而获取结果
from concurrent.futures import ProcessPoolExecutor
import time,os,random def task(name):
print('%s %s is running'%(name,os.getpid()))
time.sleep(random.randint(1,3)) if __name__ == '__main__':
l=[]
p=ProcessPoolExecutor(5)
for i in range(10):
future=p.submit(task,'进程')
l.append(future)
p.shutdown(wait=True)
for future in l:
print(future.result())
print('主') # 进程 9228 is running
# 进程 7840 is running
# 进程 10000 is running
# 进程 9964 is running
# 进程 9628 is running
# 进程 10000 is running
# 进程 9628 is running
# 进程 9228 is running
# 进程 7840 is running
# 进程 10000 is running
# None
# None
# None
# None
# None
# None
# None
# None
# None
# None
# 主

进程池ProcessPoolExecutor

  • 利用回调函数来获取结果
from concurrent.futures import ProcessPoolExecutor
import time, os, random def task(name):
print('%s %s is running' % (name, os.getpid()))
time.sleep(random.randint(1, 3)) def parse(future):
time.sleep(1)
res = future.result()
print('%s 结果为 %s' % (os.getpid(), res)) if __name__ == '__main__':
p = ProcessPoolExecutor(4)
for i in range(10):
future = p.submit(task, 'Jack')
future.add_done_callback(parse)
p.shutdown(wait=True)
print('主',os.getpid()) # Jack 9928 is running
# Jack 9640 is running
# Jack 9760 is running
# Jack 5844 is running
# Jack 9928 is running
# 9748 结果为 None
# Jack 9640 is running
# Jack 9760 is running
# Jack 9928 is running
# 9748 结果为 None
# Jack 5844 is running
# Jack 9640 is running
# 9748 结果为 None
# 9748 结果为 None
# 9748 结果为 None
# 9748 结果为 None
# 9748 结果为 None
# 9748 结果为 None
# 9748 结果为 None
# 9748 结果为 None
# 主 9748

补充:

提交任务的两种方式:
  同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的
  异步调用:提交完一个任务之后,不在原地等待,而是直接执行下一行代码,会导致任务是并发执行的

 2、线程池

(1)什么是线程池

当有大量并发任务需要处理时,再使用传统的多线程就会造成大量的资源创建销毁导致服务器效率的下降。这时候,线程池就派上用场了。线程池技术为线程创建、销毁的开销问题和系统资源不足问题提供了很好的解决方案。

(2)用法

线程池的用法和进程池差不多

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import current_thread
import time,random,os
import requests def get(url):
print('%s GET %s' %(current_thread().name,url))
time.sleep(3)
response=requests.get(url)
if response.status_code == 200:
res=response.text
else:
res='下载失败'
return res def parse(future):
time.sleep(1)
res=future.result()
print('%s 解析结果为%s' %(current_thread().name,len(res))) if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.sina.com.cn',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.python.org',
'https://www.openstack.org',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com', ] p=ThreadPoolExecutor(4) for url in urls:
future=p.submit(get,url) #异步调用:提交完一个任务之后,不在原地等待,而是直接执行下一行代码,会导致任务是并发执行的,,结果futrue对象会在任务运行完毕后自动传给回调函数
future.add_done_callback(parse) #parse会在任务运行完毕后自动触发,然后接收一个参数future对象 p.shutdown(wait=True) print('主',current_thread().name)

线程池ThreadProcessExecutor

3、map方法

map方法无法实现获取结果

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11):
# future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit

map方法

4、回调函数

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os def get_page(url):
print('<进程%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text} def parse_page(res):
res=res.result()
print('<进程%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res) if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
p=ProcessPoolExecutor(3)
for url in urls:
future=p.submit(get_page,url)
future.add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果

回调函数

二、协程

 1、什么是协程

协程是单线程下实现并发,协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的,只是程序员总结出来的概念

操作系统里只有进程和线程的概念(操作系统调度的是线程)

2、协程的特点

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

3、协程的优缺点

优点如下:

#1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
#2. 单线程内就可以实现并发的效果,最大限度地利用cpu

缺点如下:

#1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
#2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

三、gevent模块

1、gevent作用

实现同单线程中多个任务之间的切换。

例如:单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2……如此,才能提高效率,这就用到了Gevent模块。

2、gevent用法

g1=gevent.spawn(func,1,,2,3,x=4,y=5) #创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,
可以是位置实参或关键字实参,都是传给函数eat g2=gevent.spawn(func2) g1.join() #等待g1结束 g2.join() #等待g2结束 #或者上述两步合作一步:gevent.joinall([g1,g2]) g1.value #拿到func1的返回值

2、gevent实现单线程下任务间切换

gevent无法直接识别time.sleep或者其他阻塞,要识别就必须打补丁,即from gevent import monkey;monkey.patch_all(),必须放到被打补丁者的前面,如time,socket模块之前,或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

from gevent import monkey;monkey.patch_all()
from gevent import spawn,joinall
import time def eat(name='tom'):
print('%s 正在吃' % (name))
time.sleep(1)
print("%s 吃完了" % name) def drink(name='jack'):
print('%s 正在喝' % (name))
time.sleep(1)
print("%s 喝完了" % name) start = time.time()
g1 = spawn(eat)
g2 = spawn(drink)
# g1.join()
# g2.join()
joinall([g1, g2])
print(time.time() - start) # 1.0150582790374756

四、单线程下实现并发的套接字通信

from gevent import monkey;monkey.patch_all()
from socket import *
from gevent import spawn def comunicate(conn):
while True: # 通信循环
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close() def server(ip, port, backlog=5):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip, port))
server.listen(backlog) while True: # 链接循环
conn, client_addr = server.accept()
print(client_addr) # 通信
spawn(comunicate,conn) if __name__ == '__main__':
g1=spawn(server,'127.0.0.1',8080)
g1.join()

服务端

from threading import Thread,current_thread
from socket import * def client():
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080)) n=0
while True:
msg='%s say hello %s' %(current_thread().name,n)
n+=1
client.send(msg.encode('utf-8'))
data=client.recv(1024)
print(data.decode('utf-8')) if __name__ == '__main__':
for i in range(500):
t=Thread(target=client)
t.start()

客户端