一、进程池回顾
二、同步异步
三、利用回调完成生产者消费者
四、利用回调完成生产者消费者
五、线程队列
六、协程实现
七、greenlet使用
八、gevent使用
一、进程池回顾
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
# 默认按照cpu的核数
# pool = ProcessPoolExecutor()
# #
# # def task():
# # print("%s is running" % os.getpid())
# # time.sleep(0.2)
# #
# # if __name__ == '__main__':
# # for i in range(20):
# # pool.submit(task)
# # print("11111")
# 默认按照cpu的核数
pool = ThreadPoolExecutor()
# 方式1 自己来保存数据 并执行shutdown 仅在多线程
# res = []
# def task():
# print("%s is 正在打水" % os.getpid())
# time.sleep(0.2)
# w = "%s 打的水" % os.getpid()
# res.append(w)
# return w
#
# if __name__ == '__main__':
# for i in range(20):
# # 提交任务会返回一个对象 用于回去执行状态和结果
# f = pool.submit(task)
# print(f.result()) # 方式2 执行result 它是阻塞的直到任务完成 又变成串行了
#
# print("11111")
# # pool.shutdown() # 首先不允许提交新任务 然后等目前所有任务完成后
# # print(res)
# print("over")
# 默认按照cpu的核数
pool = ThreadPoolExecutor()
# 方式3 通过回调(什么是回调 任务执行结束后自动调用某个函数)
def task():
print("%s is 正在打水" % os.getpid())
# time.sleep(0.2)
w = "%s 打的水" % os.getpid()
return w
def task_finish(res):
print("打水完成! %s" % res)
if __name__ == '__main__':
for i in range(20):
# 提交任务会返回一个对象 用于回去执行状态和结果
f = pool.submit(task)
f.add_done_callback(task_finish) #添加完成后的回调
print("11111")
# pool.shutdown() # 首先不允许提交新任务 然后等目前所有任务完成后
# print(res)
print("over")
二、同步异步
""" 线程的三种状态: 1.就绪 2.运行 3.阻塞 阻塞 遇到了IO操作 代码卡主 无法执行下一行 CPU会切换到其他任务 非阻塞 与阻塞相反 代码正在执行(运行状态) 或处于就绪状态 阻塞和非阻塞描述的是运行的状态 同步 :提交任务必须等待任务完成,才能执行下一行 异步 :提交任务不需要等待任务完成,立即执行下一行 指的是一种提交任务的方式 """ def task(): for i in range(1000000): i += 1000 print("11111") print("start") task() # 同步提交方式 print("end") from threading import Thread print("start1") Thread(target=task).start() # 异步提交 print("end1")
三、利用回调完成生产者消费者
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import current_thread import os pool = ThreadPoolExecutor() # 爬虫 1.从网络某个地址获取一个HTML文件 import requests # 该模块用于网络(HTTP)请求 # 生产数据 def get_data_task(url): print(os.getpid(),"正在生产数据!") # print(current_thread(),"正在生产数据!") response = requests.get(url) text = response.content.decode("utf-8") print(text) return text # 处理数据 def parser_data(f): print(os.getpid(),"处理数据") # print(current_thread(), "处理数据") print("正在解析: 长度%s" % len(f.result())) urls = [ "http://www.baidu.com", "http://www.baidu.com", "http://www.baidu.com", "http://www.baidu.com" ] if __name__ == '__main__': for url in urls: f = pool.submit(get_data_task,url) f.add_done_callback(parser_data) # 回调函数是主进程在执行 # 因为子进程是负责获取数据的 然而数据怎么处理 子进程并不知道 应该把数据还给主进程 print("over")
四、利用回调完成生产者消费者
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import current_thread pool = ThreadPoolExecutor() # 爬虫 1.从网络某个地址获取一个HTML文件 import requests # 该模块用于网络(HTTP)请求 # 生产数据 def get_data_task(url): # print(os.getpid(),"正在生产数据!") print(current_thread(),"正在生产数据!") response = requests.get(url) text = response.content.decode("utf-8") print(text) return text # 处理数据 def parser_data(f): # print(os.getpid(),"处理数据") print(current_thread(), "处理数据") print("正在解析: 长度%s" % len(f.result())) urls = [ "http://www.baidu.com", "http://www.baidu.com", "http://www.baidu.com", "http://www.baidu.com" ] if __name__ == '__main__': for url in urls: f = pool.submit(get_data_task,url) f.add_done_callback(parser_data) # 回调函数是主进程在执行 # 因为子进程是负责获取数据的 然而数据怎么处理 子进程并不知道 应该把数据还给主进程 print("over")
五、线程队列
import queue # 普通队列 先进先出 q = queue.Queue() q.put("a") q.put("b") print(q.get()) print(q.get()) # 堆栈队列 先进后出 后进先出 函数调用就是进栈 函数结束就出栈 递归造成栈溢出 q2 = queue.LifoQueue() q2.put("a") q2.put("b") print(q2.get()) # 优先级队列 q3 = queue.PriorityQueue() # 数值越小优先级越高 优先级相同时 比较大小 小的先取 q3.put((-100,"c")) q3.put((1,"a")) q3.put((100,"b")) print(q3.get())
六、协程实现
import time def task(): while True: print("task1") time.sleep(4) yield 1 def task2(): g = task() while True: try: print("task2") next(g) except Exception: print("任务完成") break task2()
七、greenlet使用
import greenlet import time def task1(): print("task1 1") time.sleep(2) g2.switch() print("task1 2") g2.switch() def task2(): print("task2 1") g1.switch() print("task2 2") g1 = greenlet.greenlet(task1) g2 = greenlet.greenlet(task2) g1.switch() # 1.实例化greenlet得到一个对象 传入要执行的任务 # 至少需要两个任务 # 2.先让某个任务执行起来 使用对象调用switch # 3.在任务的执行过程中 手动调用switch来切换 #
八、gevent使用
from gevent import monkey monkey.patch_all() import gevent import time def eat(): print('eat food 1') time.sleep(2) # gevent.sleep(1) print('eat food 2') def play(): print('play 1') time.sleep(1) # gevent.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) # g1.join() # g2.join() gevent.joinall([g1,g2]) print('主') # 1.spawn函数传入你的任务 # 2.调用join 去开启任务 # 3.检测io操作需要打monkey补丁 就是一个函数 在程序最开始的地方调用它