队列(queue)
队列只在多线程里有意义,是一种线程安全的数据结构。
get与put方法
''' 创建一个“队列”对象 import queue
q = queue.Queue(maxsize = 10)
queue.Queue类即是一个队列的同步实现。
队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。 将一个值放入队列中:
q.put()
调用队列对象的put()方法在队尾插入一个项目。
put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为True。如果队列当前为空且block为True,put()方法就使调用线程暂停,直到空出一个数据单元。
如果block为False,put方法将引发Full异常。
import queue q=queue.Queue(3) q.put(11)
q.put(22)
q.put(33)
q.put(44,False) #queue.Full ==q.put_nowait()
将一个值从队列中取出
q.get() 调用队列对象的get()方法从队头删除并返回一个项目。
可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
import queue q=queue.Queue(3) q.put(11)
q.put(22)
q.put(33) q.get()
q.get()
q.get()
q.get(False) # queue.Empty == q.get_nowait()
'''
join与task_done方法
join() 阻塞进程,直到所有任务完成,需要配合另一个方法task_done。
task_done() 表示某个任务完成。每一条get语句后需要一条task_done。
import queue,threading q=queue.Queue() def foo():
q.put(11)
q.put(22)
q.put(33)
q.join()
print('ok') def bar():
print(q.get())
q.task_done()
print(q.get())
q.task_done()
print(q.get())
q.task_done() t1=threading.Thread(target=foo)
t1.start() t2=threading.Thread(target=bar)
t2.start()
运行效果:
11
22
33
ok
'''
此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)非阻塞
q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作
'''
其他模式
''' Python Queue模块有三种队列及构造函数: 1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) import queue #先进后出 q=queue.LifoQueue() q.put(34)
q.put(56)
q.put(12) #优先级
q=queue.PriorityQueue()
q.put([5,100])
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}]) while 1:
data=q.get()
print(data) '''
生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
生产者:生产数据的模型
消费者:获取数据的模型
生产者消费者模型优点:
1.解耦合
2.实现并发
import threading,queue,random,time def producer():
count=1
while count<10:
baozi=random.randint(1,100)
q.put(baozi)
print('baozi %s 做好了'%baozi)
time.sleep(1)
count+=1 def consumer(id):
while 1:
baozi=q.get()
time.sleep(2)
print('顾客%s吃了包子%s'%(id,baozi)) if __name__ =='__main__':
q=queue.Queue()
t1=threading.Thread(target=producer)
t1.start() for i in range(3):
t=threading.Thread(target=consumer,args=(i,))
t.start()
运行效果:(部分)
baozi 4 做好了
baozi 79 做好了
顾客0吃了包子4
baozi 58 做好了
顾客1吃了包子79
baozi 28 做好了
multiprocessing模块
由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
multiprocessing包是Python中的多进程管理包。
它可以利用multiprocessing.Process对象来创建一个进程。也有start(), run(), join()的方法。
多进程优缺点:
优点:可以利用多核,实现并行运算
缺点:
1.开销太大
2.通信困难
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
import multiprocessing
import time def foo():
print('ok')
time.sleep(2) if __name__ =='__main__':
p=multiprocessing.Process(target=foo)
p.start()
print('ending')
运行结果:
ending
ok
python的进程调用
# Process类调用
from multiprocessing import Process
import os
import time def info(name):
print('name:',name)
print('parent process:',os.getppid())
print('process id:',os.getpid())
print('-------------')
time.sleep(1) def foo(name):
info(name) if __name__ =='__main__':
info('main process line') p1=Process(target= info,args=('alex',))
p2=Process(target=foo,args=('egon',))
p1.start()
p2.start() p1.join()
p2.join() print('ending')
运行效果:
name: main process line
parent process: 3012
process id: 6836
-------------
name: alex
parent process: 6836
process id: 8028
-------------
name: egon
parent process: 6836
process id: 1540
-------------
ending
# 继承Process类调用
from multiprocessing import Process
import time class Myprocess(Process):
def __init__(self):
super(Myprocess,self).__init__() def run(self):
print('hello',self.name,time.ctime())
time.sleep(1) if __name__ == '__main__':
l=[]
for i in range(3):
p=Myprocess()
p.start()
l.append(p) for p in l:
p.join() print('ending') 运行效果:
hello Myprocess-1 Fri Jul 21 16:58:36 2017
hello Myprocess-2 Fri Jul 21 16:58:36 2017
hello Myprocess-3 Fri Jul 21 16:58:36 2017
ending
process类
注意:在windows中Process()必须放到# if __name__ == '__main__':下
原因:
由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。
这是隐藏对Process()内部调用的原因,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。
构造方法: Process([group [, target [, name [, args [, kwargs]]]]]) group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。(args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号, kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}) 实例方法: is_alive():返回进程是否在运行。 join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。 start():启动进程,并调用该子进程中的p.run() run():strat()调用run方法,如果实例进程时未制定传入target,这start默认执行run()方法。 terminate():不管任务是否完成,立即停止工作进程 属性: daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 name:进程名字。 pid:进程号。
通过tasklist(Win)或者ps -elf |grep(linux)命令检测每一个进程号(PID)对应的进程名
进程间通讯
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
进程对列Queue(队列就是管道加锁实现的)
from multiprocessing import Queue,Process def foo(q):
q.put([11,'hello',True]) if __name__ =='__main__':
q=Queue() p=Process(target=foo,args=(q,))
p.start() print(q.get())
运行效果:
[11, 'hello', True]
管道(pipe)
Pipe()返回的两个连接对象代表管道的两端。
每个连接对象都有send()和recv()方法(等等)。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
注意:send()和recv()方法使用pickle模块对对象进行序列化。
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
dumplex:默认管道是全双工的,如果将duplex设置成False,conn1只能用于接收,conn2只能用于发送。
from multiprocessing import Pipe,Process def foo(sk):
sk.send('hello world')
print(sk.recv()) if __name__ == '__main__':
sock,conn=Pipe()
p=Process(target=foo,args=(sock,))
p.start() print(conn.recv())
conn.send('hi son')
运行效果:
hello world
hi son
manager对象实现数据共享
Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。
进程间通信应该尽量避免使用本节所讲的共享数据的方式
from multiprocessing import Manager,Process def foo(l,n):
l.append(n**2) if __name__ == '__main__':
manager=Manager()
mlist=manager.list([11,22,33]) l=[]
for i in range(5):
p=Process(target=foo,args=(mlist,i))
p.start()
l.append(p) for i in l:
i.join() print(mlist)
运行效果:
[11, 22, 33, 1, 0, 9, 4, 16]
from multiprocessing import Manager,Process def foo(dic,new_dic):
dic.update(new_dic) if __name__ == '__main__':
manager=Manager()
mdict=manager.dict({'a':1,'b':2}) l=[]
for i in range(3):
p=Process(target=foo,args=(mdict,dict.fromkeys(['c','d','e'],i)))
p.start()
l.append(p) for p in l:
p.join() print(mdict)
运行效果:
{'a': 1, 'b': 2, 'c': 2, 'd': 2, 'e': 2}
进程池
开多进程的目的是为了并发,如果有多核,通常有几个核就开几个进程,进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)
但很明显需要并发执行的任务要远大于核数,这时我们就可以通过维护一个进程池来控制进程数目
当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
from multiprocessing import Pool
import time def foo(n,):
print(n)
time.sleep(1) if __name__ == '__main__':
pool=Pool(5)
for i in range(100):
pool.apply_async(func=foo,args=(i,)) pool.close()
pool.join() print('ending')
运行效果:
可以理解该程序为:5人共搬100块砖,动作同步,5人每次共搬5块,休息1秒,继续下一次...共搬20次
协程(重点)
英文名Coroutine。
协程,又称微线程,是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的
yield与协程
首先回顾yield用法:
def foo():
print('foo')
yield
print('foo2') return def bar():
print(bar)
yield
print('bar2') foo() #创建了一个生成器对象,不会执行代码
bar() #创建了一个生成器对象,不会执行代码
def foo():
print('foo')
yield 5
print('foo2') yield 8 gen=foo() ret=next(gen)
print(ret)
res=next(gen)
print(res)
运行效果:
foo
5
foo2
8
def foo():
print('foo')
n=yield 5
print('n',n) #123
print('foo2') yield 8 gen=foo() ret=next(gen)
print(ret) #5
res=gen.send('123')
print(res) #8
运行效果:
foo
5
n 123
foo2
8
基于yield生成器函数实现生产者消费者模型
"""
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
"""
import time def consumer():
r = ''
while True:
n = yield r # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
if not n:
return
print('[CONSUMER] ←← Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
next(c) #1、首先调用c.next()启动生成器
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] →→ Producing %s...' % n)
cr = c.send(n) #2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
#4、produce拿到consumer处理的结果,继续生产下一条消息;
print('[PRODUCER] Consumer return: %s' % cr)
c.close() #5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
if __name__=='__main__':
# 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
c = consumer()
produce(c)
运行效果:
[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK
greenlet
from greenlet import greenlet def foo():
print('ok1')
gr2.switch()
print('ok3')
gr2.switch() def bar():
print('ok2')
gr1.switch()
print('ok4') gr1=greenlet(foo)
gr2=greenlet(bar)
gr1.switch()
运行效果:
ok1
ok2
ok3
ok4
gevent模块实现协程(yield、greenlet都无法实现遇到IO操作自动切换到其它协程,就用到了gevent模块(select机制))
由于IO操作非常耗时,经常使程序处于等待状态,gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成
import gevent def foo():
print('running in foo')
gevent.sleep(2) #模拟IO操作
print('switch to foo again') def bar():
print('switch to bar')
gevent.sleep(1)
print('switch to bar again') gevent.joinall([gevent.spawn(foo),gevent.spawn(bar)])
运行效果:
running in foo
switch to bar
switch to bar again
switch to foo again
g=gevent.spawn()创建一个协程对象g
spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
对比普通函数执行和使用协程后的效果:
from gevent import monkey
monkey.patch_all()
# import gevent
import requests
import time def f(url):
response = requests.get(url)
response_str=response.text
print('get data %s---url[%s]' % (len(response_str), url)) start=time.time() f('https://itk.org/')
f('https://www.github.com/') print(time.time()-start)
运行效果:
get data 12323---url[https://itk.org/]
get data 56751---url[https://www.github.com/]
5.5523176193237305
from gevent import monkey
monkey.patch_all()
import gevent
import requests
import time def f(url):
response = requests.get(url)
response_str=response.text
print('get data %s---url[%s]' % (len(response_str), url)) start=time.time() gevent.joinall([
gevent.spawn(f, 'https://itk.org/'),
gevent.spawn(f, 'https://www.github.com/'),
]) print(time.time()-start)
运行效果:
get data 12323---url[https://itk.org/]
get data 56751---url[https://www.github.com/]
3.939225435256958
需要强调的是:
1. python的线程属于内核级别的,即由操作系统控制调度(如单线程一旦遇到io就*交出cpu执行权限,切换其他线程运行)
2. 单线程内开启协程,一旦遇到io,从应用程序级别(而非操作系统)控制切换
对比操作系统控制线程的切换,用户在单线程内控制协程的切换,优点如下:
1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点:
- 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
- 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
作业:
第一个作业:使用进程池爬取网页内容,自己往进程池累加进程,找到可以实现效果的最短时间 import re
import requests
from multiprocessing import Pool
import time def run(n):
url='http://www.budejie.com/pic/' + str(n) response = requests.get(url).text
ret = re.compile('<div class="j-r-list-c-desc">.*?<a href=.*?>(?P<article>.*?)</a>.*?</div>', re.S)
obj = ret.findall(response)
print(obj) if __name__ =='__main__':
s=time.time()
pool=Pool()
for i in range(,):
pool.apply_async(func=run,args=(i,)) pool.close()
pool.join()
print('cost time:%s'%(time.time()-s)) 第二个作业:基于协程的爬虫示例:建议使用requests模块(urllib模块不太方便)(基于gevent库)
import re
import requests
import gevent
import time def run(url): response = requests.get(url).text
ret = re.compile('<div class="j-r-list-c-desc">.*?<a href=.*?>(?P<article>.*?)</a>.*?</div>', re.S)
obj = ret.findall(response)
print(obj) if __name__ =='__main__':
s=time.time()
gevent.joinall([gevent.spawn(run,'http://www.budejie.com/pic/'+str(i)) for i in range(1,11)]) print('cost time:%s'%(time.time()-s))
Python--线程队列(queue)、multiprocessing模块(进程对列Queue、管道(pipe)、进程池)、协程的更多相关文章
-
python线程队列Queue-FIFO(35)
之前的文章中讲解很多关于线程间通信的知识,比如:线程互斥锁lock,线程事件event,线程条件变量condition 等等,这些都是在开发中经常使用的内容,而今天继续给大家讲解一个更重要的知识点 — ...
-
python 线程队列PriorityQueue(优先队列)(37)
在 线程队列Queue / 线程队列LifoQueue 文章中分别介绍了先进先出队列Queue和先进后出队列LifoQueue,而今天给大家介绍的是最后一种:优先队列PriorityQueue,对队列 ...
-
python 线程队列LifoQueue-LIFO(36)
在 python线程队列Queue-FIFO 文章中已经介绍了 先进先出队列Queue,而今天给大家介绍的是第二种:线程队列LifoQueue-LIFO,数据先进后出类型,两者有什么区别呢? 一.队 ...
-
day34 线程池 协程
今日内容: 1. 线程的其他方法 2.线程队列(重点) 3.线程池(重点) 4.协程 1.线程的其他方法 语法: Threading.current_thread() # 当前正在运行的线程对象的一个 ...
-
管道(pipe),进程之间的共享内存(Manager,Value)
1 管道(了解) from multiprocessing import Pipe con1,con2 = Pipe() 管道是不安全的. 管道是用于多进程之间通信的一种方式. 如果在单进程中使用管道 ...
-
python 线程(其他方法,队列,线程池,协程 greenlet模块 gevent模块)
1.线程的其他方法 from threading import Thread,current_thread import time import threading def f1(n): time.s ...
-
python线程+队列(queue)
---恢复内容开始--- python的线程学习 用处 pocpiliang脚本的编写 函数式:调用 _thread 模块中的start_new_thread()函数来产生新线程.语法如下: _thr ...
-
Python多进程之multiprocessing模块和进程池的实现
1.利用multiprocessing可以在主进程中创建子进程,提升效率,下面是multiprocessing创建进程的简单例子,和多线程的使用非常相似 ''' 代码是由主进程里面的主线程从上到下执行 ...
-
python全栈开发 * 线程队列 线程池 协程 * 180731
一.线程队列 队列:1.Queue 先进先出 自带锁 数据安全 from queue import Queue from multiprocessing import Queue (IPC队列)2.L ...
随机推荐
-
spring mvc 快速入门
---------- 转自尚学堂 高淇 --------- Spring MVC 背景介绍 Spring 框架提供了构建 Web 应用程序的全功能 MVC 模块.使用 Spring 可插入的 MVC ...
-
Schwarz积分公式
设$f\in H(B(0,R))\cap C(\overline{B(0,R)})$,且$f=u+iv$,则$f$可用其实部表示为 $$f(z)=\frac{1}{2\pi}\int_{0}^{2\p ...
-
笔记之Python网络数据采集
笔记之Python网络数据采集 非原创即采集 一念清净, 烈焰成池, 一念觉醒, 方登彼岸 网络数据采集, 无非就是写一个自动化程序向网络服务器请求数据, 再对数据进行解析, 提取需要的信息 通常, ...
-
CGI初识
---恢复内容开始--- 转自http://www.moon-soft.com/program/bbs/readelite887957.htm 用 C/C++ 写 CGI 程序 小传(zhcharle ...
-
Mysql 数据库表操作
☞ 创建表CREATE TABLE `数据库`.`表` ( `id` INT( 11 ) NOT NULL AUTO_INCREMENT COMMENT '注释',`type_name` VARCHA ...
-
keil uVision4一些使用总结(汉字注释,C关键字等)
近日心血来潮,下载了最新的版的keil,再加上protues ,想弄个虚拟环境.主要原因还是经济问题.电子元件,是要花钱的... 今天遇到些keil uVision 4使用方面的问题,记录下来,方便以 ...
-
jquery中这句 .stop(false,true); 什么意思
.stop 是jQuery中用于控制页面动画效果的方法.运行之后立刻结束当前页面上的动画效果.stop在新版jQuery中添加了2个参数:第一个参数的意思是是否清空动画序列,也就是stop的是当前元素 ...
-
把GIF背景变透明
准备软件: 1.Ps cs4 2.QuickTime Player 7.74 开始: 1. 2.弹出文件选择框,但是发现不能选择GIF格式. 3.没关系,在文件名框输入*.*回车,就发现可以选择GIF ...
-
android sdk 历史版本下载地址
https://developer.android.google.cn/studio/archive#android-studio-3-0?utm_source=androiddevtools& ...
-
BBS(第二天) Django之Admin 自动化管理数据页面 与创建一个用户注册的验证码
1.admin的概念 # Admin是Django自带的一个功能强大的自动化数据管理界面 # 被授权的用户可以直接在Admin中操作数据库 # Django提供了许多针对Admin的定制功能 2. 配 ...