4月28日 python学习总结 线程与协程

时间:2020-11-28 00:00:49

一、 异步与回调机制  

 问题:

 1、任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理

 2、解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s

解决一: (线程实现异步,回调解析结果)    

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import requests
import os
import time
import random

def get(url):
    print('%s GET %s' %(current_thread().name,url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))

    if response.status_code == 200:
        # 干解析的活
        return response.text

def pasrse(obj):
    res=obj.result()
    print('%s 解析结果为:%s' %(current_thread().name,len(res)))

if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.python.org',
    ]
    pool=ThreadPoolExecutor(4)
    for url in urls:
        obj=pool.submit(get,url)             #放入进程池,实现异步操作
        obj.add_done_callback(pasrse)    #回调,将线程执行结果当作参数传递给pasrse函数,线程是谁先空闲谁执行结果处理,不存在主次之分

    print('主线程',current_thread().name)

   

     解决二: (进程实现异步,回调解析结果)

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
import random

def get(url):
    print('%s GET %s' %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))

    if response.status_code == 200:
        # 干解析的活
        return response.text

def pasrse(obj):
    res=obj.result()
    print('%s 解析结果为:%s' %(os.getpid(),len(res)))

if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.python.org',
    ]

    pool=ProcessPoolExecutor(4)
    for url in urls:
        obj=pool.submit(get,url)           #放入进程池,实现异步操作
        obj.add_done_callback(pasrse)      #回调,将进程执行结果当作参数传递给pasrse函数,由主进程执行

    print('主进程',os.getpid())

 

二、线程queue

import queue

q=queue.Queue(3) #队列:先进先出
q.put(1)
q.put(2)
q.put(3)
# q.put(4)

print(q.get())
print(q.get())
print(q.get())

q=queue.LifoQueue(3) #堆栈:后进先出

q.put('a')
q.put('b')
q.put('c')

print(q.get())
print(q.get())
print(q.get())

q=queue.PriorityQueue(3) #优先级队列:可以以小元组的形式往队列里存值,第一个元素代表优先级,数字越小优先级越高
q.put((10,'user1'))
q.put((-3,'user2'))
q.put((-2,'user3'))

print(q.get())
print(q.get())
print(q.get())

 

三、线程Event   

from threading import Event,current_thread,Thread
import time

event=Event()      # 监听信号 初始值为False

def check():
    print('%s 正在检测服务是否正常....' %current_thread().name)
    time.sleep(5)
    event.set()         #set 方法将信号值 置为True


def connect():
    count=1
    while not event.is_set():        #判断标记为是否为True
        if count ==  4:
            print('尝试的次数过多,请稍后重试')
            return
        print('%s 尝试第%s次连接...' %(current_thread().name,count))
        event.wait(1)             #括号里的是等待时间,程序想继续运行,除非标志位为True或者超时,此处超时不会报错,是继续执行
        count+=1
    print('%s 开始连接...' % current_thread().name)

if __name__ == '__main__':
    t1=Thread(target=connect)
    t2=Thread(target=connect)
    t3=Thread(target=connect)

    c1=Thread(target=check)

    t1.start()
    t2.start()
    t3.start()
    c1.start()

 

四、协程    

1、单线程下实现并发:协程

并发指的多个任务看起来是同时运行的

并发实现的本质:切换+保存状态

 

2、并发、并行、串行:

并发:看起来是同时运行,切换+保存状态
并行:真正意义上的同时运行,只有在多cpu的情况下才能
实现并行,4个cpu能够并行4个任务

串行:一个人完完整整地执行完毕才运行下一个任务

import time
def consumer():
     '''任务1:接收数据,处理数据'''
     while True:
         x=yield
 
 
def producer():
     '''任务2:生产数据'''
     g=consumer()
     next(g)
     for i in range(10000000):
        g.send(i)

 start=time.time()
 #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
 #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
 producer() #1.0202116966247559

 stop=time.time()
 print(stop-start)

 

    并不是所有协程都能提升效率,如果是IO密集型的,协程会提高执行效率,然而计算密集型的切换并不能提高效率,反而会降低效率                                                                        

五、单线程下实现遇到IO切换

    1、greentlet可以切换,但不能遇到IO切  

from greenlet import greenlet
import time

def eat(name):
    print('%s eat 1' %name)
    time.sleep(30)
    g2.switch('alex')        #遇到switch切换
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2' %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('egon')

·    

    2、gevent切换,只能识别自己的IO操作,无法数别系统定义的IO,如time.sleep()

import gevent

def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(5)        #gevent自定义的IO  可切换
    print('%s eat 2' %name)
def play(name):  
    print('%s play 1' %name)
    gevent.sleep(3)
    print('%s play 2' %name)

g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,'alex')

# g1.join()
# g2.join()
gevent.joinall([g1,g2])


 #无法识别,不能切换 from gevent import monkey;monkey.patch_all()
import gevent
import time

def eat(name):
    print('%s eat 1' %name)
    time.sleep(5)         #无法识别,不能切换
    print('%s eat 2' %name)
def play(name):
    print('%s play 1' %name)
    time.sleep(3)
    print('%s play 2' %name)

g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,'alex')

# g1.join()
# g2.join()
gevent.joinall([g1,g2])

 

3、若想要实现系统定义的IO切换需加上       

import monkey;monkey.patch_all()

 

eg:

from gevent import monkey;monkey.patch_all()
from threading import current_thread
import gevent
import time

def eat():
    print('%s eat 1' %current_thread().name)
    time.sleep(5)
    print('%s eat 2' %current_thread().name)
def play():
    print('%s play 1' %current_thread().name)
    time.sleep(3)
    print('%s play 2' %current_thread().name)

g1=gevent.spawn(eat)
g2=gevent.spawn(play)

# gevent.sleep(100)
# g1.join()
# g2.join()
print(current_thread().name)
gevent.joinall([g1,g2])