(并发编程)进程池线程池--提交任务2种方式+(异步回调)、协程--yield关键字 greenlet ,gevent模块

时间:2024-03-29 14:37:56
一:进程池与线程池(同步,异步+回调函数)
先造个池子,然后放任务
为什么要用“池”:池子使用来限制并发的任务数目,限制我们的计算机在一个自己可承受的范围内去并发地执行任务
池子内什么时候装进程:并发的任务属于计算密集型
池子内什么时候装线程:并发的任务属于IO密集型
#提交任务的两种方式:
    # 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的
    # 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的
 
p=ProcessPoolExecutor(4) 
obj=p.submit(函数名,参1,参2)
obj.add_done_callback(函数名2)
#后续回调是obj会将自身传给函数名2,所以函数名2必须有且仅有一个参数。(多进程,回调主进程干)(多线程回调,子线程们干除开主线程)
p.shutdown(wait=True)#(等同于p.close()(不允许向池中放新任务) + p.join())关闭进程池的入口,并且在原地等待进程池内所有任务运行完毕
obj.result()
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,random,os
def task(name,n):
    print('%s%s is running' %(name,os.getpid()))
    time.sleep(random.randint(1,3))
    return n**2
if __name__ == '__main__':
    # print(os.cpu_count())
    p=ProcessPoolExecutor(4)
    #提交任务的两种方式:
    # 同步调用:提交完一个任务之后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的
    # 异步调用:提交完一个任务之后,不在原地等待,结果???,而是直接执行下一行代码,会导致任务是并发执行的
    l=[]
    for i in range(10):
        # 同步提交
        # res=p.submit(task,'进程pid: ',i).result()
        # print(res)
        # 异步提交
        future=p.submit(task,'进程pid: ',i)
        l.append(future)
    p.shutdown(wait=True) #关闭进程池的入口,并且在原地等待进程池内所有任务运行完毕
    for future in l:
        print(future.result())
    print('主')
'''
二、协程
1、协程是单线程实现并发
    注意:协程是程序员意淫出来的东西,操作系统里只有进程和线程的概念(操作系统调度的是线程)
    在单线程下(i/o密集型任务)实现多个任务间遇到IO就切换就可以降低单线程的IO时间,从而最大限度地提升单线程的效率
  在单线程下(计算密集型任务)切反而降低效率。
2、实现并发的三种手段:
a单线程下的并发;由程序自己控制,相对速度快
b多线程下的并发;由操作系统控制,相对速度较慢
c多进程下的并发;由操作系统控制,相对速度慢
3、基于yield保存状态,实现两个任务直接来回切换,即并发的效果 (但yield不会遇到阻塞自动切程序)
   PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
import time
def consumer():
    '''任务1:接收数据,处理数据'''
    while True:
        x=yield
def producer():
    '''任务2:生产数据'''
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)
start=time.time()
producer() #1.0202116966247559
stop=time.time()
print(stop-start)
# 纯计算的任务并发执行
import time
def task1():
    res=1
    for i in range(1000000):
        res+=i
        yield
        time.sleep(10000)  #yield不会自动跳过阻塞
        print('task1')
def task2():
    g=task1()
    res=1
    for i in range(1000000):
        res*=i
        next(g)
        print('task2')
start=time.time()
task2()
stop=time.time()
print(stop-start)
4、单线程下实现遇到IO切换
 1、用greenlet(封装yield,遇到IO不自动切)
 from greenlet import greenlet
 import time
 def eat(name):
  print('%s eat 1' %name)
  time.sleep(30)
  g2.switch('alex')  #只在第一次切换时传值
  print('%s eat 2' %name)
  g2.switch()
 def play(name):
  print('%s play 1' %name)
  g1.switch()
  print('%s play 2' %name)
 g1=greenlet(eat)
 g2=greenlet(play)
 g1.switch('egon')
 2、用gevent模块(封装greenlet,不处理的话,遇到自己的IO才主动切)
 import gevent
 def eat(name):
  print('%s eat 1' %name)
  gevent.sleep(5)  #换成time.sleep(5),不会自动切
  print('%s eat 2' %name)
 def play(name):
  print('%s play 1' %name)
  gevent.sleep(3)
  print('%s play 2' %name)
 g1=gevent.spawn(eat,'egon')
 g2=gevent.spawn(play,'alex')
 # gevent.sleep(100)
 # g1.join()
 # g2.join()
 gevent.joinall([g1,g2])
5、用gevent模块(封装greenlet,处理的话,遇到其他IO也主动切)
from gevent import monkey;monkey.patch_all()
from threading import current_thread
from gevent import spawn,joinall #pip3 install gevent
import time
def play(name):
    print('%s play 1' %name)
    time.sleep(5)
    print('%s play 2' %name)
def eat(name):
    print('%s eat 1' %name)
    time.sleep(3)
    print('%s eat 2' %name)

g1=spawn(play,'刘清正')
g2=spawn(eat,'刘清正')

# g1.join()
# g2.join()
joinall([g1,g2])