阻塞 非阻塞
阻塞:程序遇到了IO操作 导致代码无法继续执行 交出了COU执行权
非阻塞:没有IO操作 或者即使遇到IO操作 也不阻塞代码执行
阻塞 就绪 运行
指的是应用程序所处的状态
写程序时 尽量减少IO操作
同步 异步
同步:发起一个任务后,必须原地等待任务执行结束 拿到一个明确的结果
异步:发起一个任务后,不需要等待,代码继续往下执行
指的是任务的发起方式
异步任务的效率高于同步
场景:当你的任务是不需要立即获取结果的 并且还有其他任务需要处理 那就发起异步任务
如何发起异步任务:多进程 多线程
# from multiprocessing import Process # from threading import Thread # # import time # # def task(): # time.sleep(2) # print("run....") # # # if __name__ == '__main__': # p = Thread(target=task) # p.start() # print("over")
# from concurrent.futures import ProcessPoolExecutor # # # import time,random # # # def task(num): # time.sleep(random.randint(1,3)) # print(num**2) # return num**2 # # if __name__ == '__main__': # # pool=ProcessPoolExecutor() # # fs=[] # for i in range(6): # f=pool.submit(task,i) # fs.append(f) # # # 方式1 # # 按照顺序来获取结果将导致一些任务可能已经完成了但是没有及时获取到结果 # # 例如第二个任务先执行完毕 但是必须要等到第一个执行结束才能拿到结果 # # for f in fs: # # print(f.result()) # # # shutdown 关闭进程池 会阻塞知道所有进程任务全部执行完毕 # # pool.shutdown(wait=True) # for f in fs: # print(f.result()) # #关闭之后不能提交新任务 # # pool.submit(task,10) # # print('over')
同步和阻塞的区别:
阻塞一定意味着CPU切走了
而同步有可能是因为计算任务比较耗时
获取异步任务结果的方式
爬虫:
1.获取到HTML文档
2.从文档中取出需要的数据
# from concurrent.futures import ThreadPoolExecutor # import requests # import threading # # # #生产 # def get_data(url): # print("%s正在处理%s" % (threading.current_thread().name, url)) # resp = requests.get(url) # print("%s获取完成" % url) # # 方式3 直接调用处理函数 生产和消费者 强行耦合在一起 # # parser_data(resp.text,url) # # return resp.text,url # # #消费 # def parser_data(f): # res = f.result() # print("解析长度为:%s,地址:%s" % (len(res[0]),res[1])) # print("当前线程:%s" % threading.current_thread().name) # # # # 要爬取的地址列表 # urls = ["https://www.baidu.com/","https://www.bilibili.com","https://www.csdn.net"] # # pool = ThreadPoolExecutor() # # # # fs = [] # for url in urls: # f = pool.submit(get_data,url) #提交任务 # f.add_done_callback(parser_data) # 绑定一个回调函数 # 方式1 把并发变成了串行 # data = f.result() # 获取结果 # parser_data(data) # 解析结果 # fs.append(f) # 方式2 必须等待全部完成 不能及时处理 # pool.shutdown() # for f in fs: # data = f.result() # parser_data(data[0],data[1])
异步回调
回调:就是回调函数
给异步任务绑定一个函数 当任务完成时会自动调用该函数
具体使用:
当你往poll中添加了一个异步任务,会返回一个表示结果的对象
有一个对象绑定方法 add_done_callback(绑定一个回调函数 会在任务结束后自动调入) 需要一个函数作为参数
注意: 回调函数 必须有且只有一个参数 就是对象本身
通过对象.result来获取结果
回调函数交给子线程执行 谁有空谁处理
优点:不用原地等待 任务结果可以立即获取到
# from threading import Thread # from multiprocessing import Process # # def task(callback): # print("run......") # callback() # # # def callback(): # print("这是回调函数.....") # # # if __name__ == '__main__': # t = Process(target=task,args=(callback,)) # t.start() # # from concurrent.futures import ProcessPoolExecutor # import os # # # def task(): # print("task....run") # # def call_back(obj): # print(os.getpid()) # print(obj.result()) # # if __name__ == '__main__': # print("主:",os.getpid()) # pool = ProcessPoolExecutor() # f = pool.submit(task) # f.add_done_callback(call_back)
线程队列:
import queue queue该模块下提供了一些常用的数据容器 但是他们仅仅是容器 每一个数据共享这个提点 from queue import Queue,LifoQueue,PriorityQueue # q = Queue() # q.put(1) # q.put(2) # print(q.get()) # print(q.get()) # print(q.get(timeout=2))
Lifoqueue() 后进先出 跟堆栈相似
# q = LifoQueue() # q.put(1) # # q.put(2) # # print(q.get())
优先级队列
需要传入一个元组类型 第一个是优先级 第二个是值
# q = PriorityQueue() # # q.put((-111111,"abc")) # # q.put((2,"abc")) # q.put(("a","hello world1")) # q.put(("A","hello world2")) # # q.put(("c","hello world")) # print(q.get())
事件是一个通知信息 表示什么时间发生了什么事情
用于线程间通讯
线程间 本来就是数据共享的 也就是说 即使没有事件这个东西 也是没有问题的
线程之间,执行流程是完全独立的 一些时候可能需要知道另一个线程发生了什么然后采取一些行动
事件其实就是帮你维护了一个bool值
在bool为True之前 wait函数将一直阻塞 这样一来就避免了不断地询问对方的状态
假设有两条线程 一个用于开启服务器 一个用于连接服务器
连接服务器一定要保证 服务器已经开启成功了 服务器启动需要花费一些时间
from threading import Thread,Event import time,random # 不使用事件 # 默认未启动 # is_boot=False # # def boot_server(): # global is_boot # print('启动服务器') # time.sleep(random.randint(1,3)) # print('服务器启动成功') # is_boot=True # # # def connect_server(): # while True: # if is_boot: # print('连接成功!') # break # else: # print('连接失败') # time.sleep(1) # # t1 = Thread(target=boot_server) # t1.start() # # t2 = Thread(target=connect_server) # t2.start() # 使用事件 # 一个事件 # boot = Event() # # def boot_server(): # print("正在启动服务器.......") # time.sleep(random.randint(2,5)) # print("服务器启动成功.......") # boot.set() # # # def connect_server(): # print("开始尝试连接.....") # boot.wait() # 是一个阻塞函数 会一直等到set()函数被调用 # print("连接服务器成功!") # # t1 = Thread(target=boot_server) # t1.start() # # t2 = Thread(target=connect_server) # t2.start()
协程就是要用单线程实现并发
GIL导致多个线程不能并行 效率低
Cpython中多个线程不能并行
既然多个线程也不乏提高执行效率 还有没有必要开线程
例子:只有一个岗位 但是有十个任务 请十个人也提高不了效率 反而增加了系统开销
现在只有一个线程 那要如何并发的处理十个任务
一个线程如何能并发?
多道技术
切换+保存状态
首先任务其实就是一堆代码 一堆代码可以组成一个函数
如何能使得可以再多个函数之间进行切换
协程指的就是一个线程完成并发执行
用于提高效率 在检测到IO操作时 切换到其他的非IO操作 这样一来 在操作系统眼里程序依旧没有阻塞
力求尽可能多的占用CPU执行权
固定使用场景:IO密集型任务
如果任务在CPython中 是计算密集型 使用协程是无法提交效率的 反而因为切换任务导致效率降低 只能靠进程了
IO密集型 多线程会比多进程效率高 因为线程的开销比进程小很多
本质上协程还是只有一个线程 所以 一旦遇到IO操作 整个线程就卡住了
协程仅在以下场景能够提交效率
1.任务是IO密集型
2.一定要可以检测IO操作并且在IO即将阻塞时 切换到计算任务 从而使得CPU尽可能多的执行你的线程
TCP的服务器端
10W线程 10个客户端
from gevent import monkey,spawn;monkey.patch_all() import time def task(): print("task run") time.sleep(3) print("task over") def task2(): print("task2 run") time.sleep(3) print("task2 over") spawn(task) spawn(task2) print("over") time.sleep(2)
greenlet 是对yield进行了封装
简化了书写 但是他不能检测IO操作
# import time # # def task1(): # for i in range(10000): # 1+1 # yield # # def task2(): # for i in range(10000): # g=task1() # 1+1 # next(g) # # # start_time=time.time() # task2() # print(time.time()-start_time)
greenlet 无法检测IO操作 gevent 即可单线程实现并发 又可以检测IO操作 import greenlet import time def task1(): print("task1 run....") time.sleep(20) g2.switch() print("task1 over....") def task2(): print("task2 run....") print("task2 over....") g1.switch() g1 = greenlet.greenlet(task1) g2 = greenlet.greenlet(task2) g1.switch()
# greenlet 无法检测IO操作 # gevent 即可单线程实现并发又可以检测IO操作 # import gevent # import gevent.monkey # # 打补丁的代码必须放在 导入模块之前 # # # import time # import threading # # # def task1(a,b): # print(a,b) # # for i in range(1000): # print("task1") # print(threading.current_thread()) # time.sleep(1) # # # def task2(): # # for i in range(1000): # print("task2") # print(threading.current_thread()) # # # g1 = gevent.spawn(task1,123,321) # g2 = gevent.spawn(task2) # # # 协程创建完成后 必须调用join函数 否则任务不会开启 可以理解为线程中的start函数 # # g.start() # # # g1.join() # # g2.join() # gevent.joinall([g1,g2]) # print("over")
形式 打补丁的代码必须放在模块之前
import gevent def task(): .... g=gevent.spawn(task) g.join() print('over')
协程创建完成后必须调用join函数 否则任务不会开启 可以理解为线程中的start函数
# import gevent.monkey # gevent.monkey.patch_all() # import socket # import time # # def task(): # s = socket.socket() # s.bind(("127.0.0.1",8989)) # s.listen() # while True: # c,addr = s.accept() # g2 = gevent.spawn(handler,c) # # g2.join() # # def handler(c): # while True: # print(c.recv(1024).decode("utf-8")) # # # # def task2(): # # while True: # # time.sleep(1) # # print("-----------") # # # g1 = gevent.spawn(task) # g1.join()