异步、+回调机制、线程queue、线程Event、协程、单线程实现遇到IO切换

时间:2022-03-02 09:20:19
# from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# import requests
# import os
# import time
# import random
#
# def get(url):
# print('%s GET %s' %(os.getpid(),url))
# response=requests.get(url)
# time.sleep(random.randint(1,3))
#
# if response.status_code == 200:
# return response.text
#
# def pasrse(res):
# print('%s 解析结果为:%s' %(os.getpid(),len(res)))
#
# if __name__ == '__main__':
# urls=[
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.python.org',
#
# ]
#
# pool=ProcessPoolExecutor(4)
# objs=[]
# for url in urls:
# obj=pool.submit(get,url)
# objs.append(obj)
#
# pool.shutdown(wait=True)
# # 问题:
# # 1、任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理
# # 2、解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s
# for obj in objs:
# res=obj.result()
# pasrse(res) # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# import requests
# import os
# import time
# import random
#
# def get(url):
# print('%s GET %s' %(os.getpid(),url))
# response=requests.get(url)
# time.sleep(random.randint(1,3))
#
# if response.status_code == 200:
# pasrse(response.text)
#
# def pasrse(res):
# print('%s 解析结果为:%s' %(os.getpid(),len(res)))
#
# if __name__ == '__main__':
# urls=[
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.python.org',
#
# ]
#
# pool=ProcessPoolExecutor(4)
# for url in urls:
# pool.submit(get,url)
# # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# import requests
# import os
# import time
# import random
#
# def get(url):
# print('%s GET %s' %(os.getpid(),url))
# response=requests.get(url)
# time.sleep(random.randint(1,3))
#
# if response.status_code == 200:
# # 干解析的活
# return response.text
#
# def pasrse(obj):
# res=obj.result()
# print('%s 解析结果为:%s' %(os.getpid(),len(res)))
#
# if __name__ == '__main__':
# urls=[
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.baidu.com',
# 'https://www.python.org',
# ]
#
# pool=ProcessPoolExecutor(4)
# for url in urls:
# obj=pool.submit(get,url)
# obj.add_done_callback(pasrse)
#
# # 问题:
# # 1、任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理
# # 2、解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s
# print('主进程',os.getpid()) #解决问题: from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import requests
import os
import time
import random def get(url):
print('%s GET %s' %(current_thread().name,url))
response=requests.get(url)
time.sleep(random.randint(1,3)) if response.status_code == 200:
# 干解析的活
return response.text def pasrse(obj):
res=obj.result()
print('%s 解析结果为:%s' %(current_thread().name,len(res))) if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.python.org',
] pool=ThreadPoolExecutor(4)
for url in urls:
obj=pool.submit(get,url)
obj.add_done_callback(pasrse) print('主线程',current_thread().name)

异步+回调机制

线程queue:

1、队列:先进先出

# q=queue.Queue(3) #队列:先进先出
# q.put(1)
# q.put(2)
# q.put(3)
# # q.put(4)
#
# print(q.get())
# print(q.get())
# print(q.get())

队列

2、堆栈:后进先出

# q=queue.LifoQueue(3) #堆栈:后进先出
#
# q.put('a')
# q.put('b')
# q.put('c')
#
# print(q.get())
# print(q.get())
# print(q.get())

堆栈

3、优先级队列:可以以小元组的形式往队列理存值,第一个元素代表优先级,数字越小优先级别越高

q=queue.PriorityQueue(3)
q.put((10,'user1'))
q.put((-3,'user2'))
q.put((-2,'user3')) print(q.get())
print(q.get())
print(q.get())

优先级队列

Event: 进程之间协同工作

# from threading import Event,current_thread,Thread
# import time
#
# event=Event()
#
# def check():
# print('%s 正在检测服务是否正常....' %current_thread().name)
# time.sleep(3)
# event.set()
#
#
# def connect():
# print('%s 等待连接...' %current_thread().name)
# event.wait()
# print('%s 开始连接...' % current_thread().name)
#
# if __name__ == '__main__':
# t1=Thread(target=connect)
# t2=Thread(target=connect)
# t3=Thread(target=connect)
#
# c1=Thread(target=check)
#
# t1.start()
# t2.start()
# t3.start()
# c1.start() from threading import Event,current_thread,Thread
import time event=Event() def check():
print('%s 正在检测服务是否正常....' %current_thread().name)
time.sleep(5)
event.set() def connect():
count=1
while not event.is_set():
if count == 4:
print('尝试的次数过多,请稍后重试')
return
print('%s 尝试第%s次连接...' %(current_thread().name,count))
event.wait(1)
count+=1
print('%s 开始连接...' % current_thread().name) if __name__ == '__main__':
t1=Thread(target=connect)
t2=Thread(target=connect)
t3=Thread(target=connect) c1=Thread(target=check) t1.start()
t2.start()
t3.start()
c1.start()

Event

协程:

1、单线程下实现并发:协程

    并发指的多个任务看起来是同时运行的

    并发实现的本质:切换+保存状态

    并发、并行、串行:

    并发:看起来是同时运行,切换+保存状态

    并行:真正意义上的同时运行,只有在多cpu的情况下才能

      实现并行,4个cpu能够并行4个任务

    串行:一个人完完整整地执行完毕才运行下一个任务

# import time
# def consumer():
# '''任务1:接收数据,处理数据'''
# while True:
# x=yield
#
#
# def producer():
# '''任务2:生产数据'''
# g=consumer()
# next(g)
# for i in range(10000000):
# g.send(i)
#
# start=time.time()
# #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
# #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
# producer() #1.0202116966247559
#
#
# stop=time.time()
# print(stop-start) #
# import time
# def consumer(res):
# '''任务1:接收数据,处理数据'''
# pass
#
# def producer():
# '''任务2:生产数据'''
# res=[]
# for i in range(10000000):
# res.append(i)
#
# consumer(res)
# # return res
#
# start=time.time()
# #串行执行
# res=producer()
# stop=time.time()
# print(stop-start)

协程

单线程下实现IO切换:

# from greenlet import greenlet
# import time
#
# def eat(name):
# print('%s eat 1' %name)
# time.sleep(30)
# g2.switch('alex')
# print('%s eat 2' %name)
# g2.switch()
# def play(name):
# print('%s play 1' %name)
# g1.switch()
# print('%s play 2' %name)
#
# g1=greenlet(eat)
# g2=greenlet(play)
#
# g1.switch('egon') # import gevent
#
# def eat(name):
# print('%s eat 1' %name)
# gevent.sleep(5)
# print('%s eat 2' %name)
# def play(name):
# print('%s play 1' %name)
# gevent.sleep(3)
# print('%s play 2' %name)
#
# g1=gevent.spawn(eat,'egon')
# g2=gevent.spawn(play,'alex')
#
# # gevent.sleep(100)
# # g1.join()
# # g2.join()
# gevent.joinall([g1,g2]) # from gevent import monkey;monkey.patch_all()
# import gevent
# import time
#
# def eat(name):
# print('%s eat 1' %name)
# time.sleep(5)
# print('%s eat 2' %name)
# def play(name):
# print('%s play 1' %name)
# time.sleep(3)
# print('%s play 2' %name)
#
# g1=gevent.spawn(eat,'egon')
# g2=gevent.spawn(play,'alex')
#
# # gevent.sleep(100)
# # g1.join()
# # g2.join()
# gevent.joinall([g1,g2]) from gevent import monkey;monkey.patch_all()
from threading import current_thread
import gevent
import time def eat():
print('%s eat 1' %current_thread().name)
time.sleep(5)
print('%s eat 2' %current_thread().name)
def play():
print('%s play 1' %current_thread().name)
time.sleep(3)
print('%s play 2' %current_thread().name) g1=gevent.spawn(eat)
g2=gevent.spawn(play) # gevent.sleep(100)
# g1.join()
# g2.join()
print(current_thread().name)
gevent.joinall([g1,g2])

代码