异步回调,线程队列,协程

时间:2022-03-15 23:35:13

异步回调

1.以爬取网站数据为例

①异步提交任务,等所有任务执行完毕后,串行解析

缺点:任务的返回值不能得到即使的处理,必须等到任务都完成后,一起拿到结果,串行解析

import requests
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import current_thread
import time
import random
def get(url):
    print("%s 启动"%current_thread().name)
    time.sleep(random.randint(1,2))
    res = requests.get(url)
    if res.status_code == 200:
        print("%s 结束" % current_thread().name)
        return  res.content.decode("utf-8")
    # print(response.text)#返回文本
    # print(response.content.decode("utf-8"))#可以显示中文
    
def parser(res):
    print("%s 解析结果为 %s"%(current_thread().name,len(res)))
    
if __name__ == '__main__':
    tpool = ThreadPoolExecutor(4)
    urls = ["https://www.baidu.com",
"https://www.sina.com",
"https://www.tmall.com",
"https://www.taobao.com",
"https://www.jd.com",
"https://www.python.org",
"https://www.apple.com"]
    objs = []
    for i in urls:
        obj = tpool.submit(get,i)#异步提交任务
        objs.append(obj)
    tpool.shutdown(wait = True)
    for obj in objs:#串行解析
        parser(obj.result())

②爬取和解析放在一个函数内,实现了并发解析,相当于给线程加了一个任务

爬取和解析耦合性强

import requests
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import current_thread
import time
import random

def get(url):
    print("%s 启动"%current_thread().name)
    time.sleep(random.randint(1,2))
    res = requests.get(url).content.decode("utf-8")
    print("%s 结束" % current_thread().name)
    parser(res)
    # print(response.text)#返回文本
    # print(response.content.decode("utf-8"))#可以显示中文
def parser(res):
    print("%s 解析结果为 %s"%(current_thread().name,len(res)))
if __name__ == '__main__':
    tpool = ThreadPoolExecutor(4)
    urls = ["https://www.baidu.com",
"https://www.sina.com",
"https://www.tmall.com",
"https://www.taobao.com",
"https://www.jd.com",
"https://www.python.org",
"https://www.apple.com"]
    # objs = []
    for i in urls:
        obj = tpool.submit(get,i)

import requests
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import current_thread
import time
import random
def get(url):
    print("%s 启动"%current_thread().name)
    time.sleep(random.randint(1,2))
    res = requests.get(url).content.decode("utf-8")
    print("%s 结束" % current_thread().name)
    return res
    # print(response.text)#返回文本
    # print(response.content.decode("utf-8"))#可以显示中文
    
    
def parser(obj):#这里只能放对象
    res = obj.result()#对象拿到返回值后,才过来调用,所以不会有阻塞
    print("%s 解析结果为 %s"%(current_thread().name,len(res)))
    
    
if __name__ == '__main__':
    tpool = ThreadPoolExecutor(4)
    urls = ["https://www.baidu.com",
"https://www.sina.com",
"https://www.tmall.com",
"https://www.taobao.com",
"https://www.jd.com",
"https://www.python.org",
"https://www.apple.com"]
    for i in urls:
        obj = tpool.submit(get,i)
        obj.add_done_callback(parser)#将任务绑定方法,任务执行完毕后(拿到返回值),自动调用该方法(将拿到的返回值给方法)
 
 parser不会有阻塞,什么时候执行,其实是将对象传送给parser方法,对象执行完毕,拿到返回值,才会执行parser方法,故不会有阻塞
     

主进程交给子进程一个任务,子进程在执行完后,发一个信号给主进程,主进程调用它自己的函数

通常异步任务都会和回调函数一起使用

使用add_done_callback()给future对象绑定一个回调函数

注意在多进程中,回调函数是交给主进程执行,而多线程中,回调函数是谁有空交给谁执行,但一定不是主线程执行

线程队列:与进程队列的区别,进程队列可以被多进程共享,而线程中的队列就是一个普通的容器不能进程共享

进程队列是申请一片共享的内存空间

#1.先进先出
import queue
q = queue.Queue()
#2.后进先出
q = queue.LiFoQueue()
#3.优先级队列
q = queue.PriorityQueue()
参数元组(优先级,数值)
优先级数值越小,优先级越高

事件

#event
#用于协调多个线程间的工作
#例如一个线程要执行某个操作,需要获取另一个线程的状态
#多线程之间传送消息
from threading import Event,Thread,current_thread
import time
e = Event()
def check():
    print("%s正在检测服务器"%current_thread().name)
    time.sleep(3)
    e.set()

def connect():
    print("%s 正在连接"%current_thread().name)
    e.wait()
    print("%s 连接成功"%current_thread().name)

if __name__ == '__main__':
    t1 =Thread(target=check)
    t2 = Thread(target=check)
    c1 = Thread(target=connect)
    c2 = Thread(target=connect)
    t1.start()
    t2.start()
    c1.start()
    c2.start()
    
    
def check():
    print("%s正在检测服务器"%current_thread().name)
    time.sleep(2)
    e.set()
def connect():
    for i in range(3):
        if e.wait(1):
            print("%s 连接成功"%current_thread().name)
            break
    else:
        print("%s 连接失败"%current_thread().name)

if __name__ == '__main__':
    t1 =Thread(target=check)
    c1 = Thread(target=connect)
    t1.start()
    c1.start()
 

协程

#单线程下实现并发
#并发指的多个任务看起来同时运行的
#并发实现的本质:切换加保存状态

优点:
协程的切换开销小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
单线程内就可以实现并发的效果,更大限度的利用cpu
缺点:
协程的本质是单线程下,无法利用多核,可以是一个程序开启多进程,每个进程内开启多个线程,每个线程内开启协程
协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

greenlet模块

不能实现遇到IO切,底层封装了yield

greenlet底层封装了yield
from greenlet import greenlet
def eat(name):
    print("%s eat 1"%name)
    #import time
    #time.sleep(3)#greenlet 遇到IO并不会切换
    g2.switch("alex")
    print("%s eat 2"%name)
    g2.switch()
    
def play(name):
    print("%s play 1"%name)
    g1.switch()
    print("%s play 2"%name)

g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch("egon")#第一次切的时候需要传参数
#switch 转换

gevent模块

from gevent import monkey;monkey.patch_all()#打补丁,不打的话,无法识别IO操作
import gevent
from threading import current_thread
def eat():
    print("%s eat 1"%current_thread().name)
    time.sleep(3)
    print("%s eat 2"%current_thread().name)
    
 def play():
    print("%s play 1"%current_thread().name)
    time.sleep(3)
    print("%s play 2"%current_thread().name)
 

 print(current_thread().name)
 g1 = gevent.spawn(eat)#提交任务
 g2 = gevent.spawn(play)
 gevent.joinall([g1,g2])#异步提交,没有这步的话,发起提交任务后,可能就退出程序了,连运行都没运行
"""执行结果:
MainThread
DummyThread-1 eat 1(dummy假的,假的线程1,2)
DummyThread-2 play 1
DummyThread-1 eat 2
DummyThread-2 play 2
"""


相关文章