python消息队列Queue(1)

时间:2021-01-24 17:40:29

实例1:消息队列Queue,不要将文件命名为“queue.py”,否则会报异常“ImportError: cannot import name 'Queue'”

[python]  view plain  copy
  1. #coding=utf-8  
  2. from multiprocessing import Queue   
  3.    
  4. q = Queue(3)#初始化一个Queue对象,最多可接收三条put消息  
  5. q.put('message-1')  
  6. q.put('message-2')  
  7. print(q.full())#False,是否满了  
  8. q.put('message-3')  
  9. print(q.full())#True  
  10.    
  11. #因为消息队列已满,下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个try会立即抛出异常  
  12. try:  
  13.     q.put('message-4',True,2)  
  14. except:  
  15.     print('except1,消息队列已满,现有消息数量:%s'%q.qsize())  
  16.    
  17. try:  
  18.     q.put_nowait('message-4')  
  19. except:  
  20.     print('except2,消息队列已满,现有消息数量:%s'%q.qsize())  
  21.    
  22. #判断队列是否已满  
  23. if not q.full():  
  24.     q.put_nowait('message-4')  
  25.    
  26. #读取消息时,先判断消息队列是否为空,在读取  
  27. if not q.empty():  
  28.     for i in range(q.qsize()):  
  29.         print(q.get())#q.get会阻塞,q.get_nowait()不阻塞,但会抛异常     

False

True

except1,消息队列已满,现有消息数量:3

except2,消息队列已满,现有消息数量:3

message-1

message-2

message-3

实例二:通过Process进程间通信


[python]  view plain  copy
  1. from multiprocessing import Process,Queue  
  2. import os,time,random   
  3.    
  4. #写数据  
  5. def write(q):  
  6.     for value in ['A','B','C']:  
  7.         print('Put %s to queue...'%value)  
  8.         q.put(value)  
  9.         time.sleep(random.random())  
  10.    
  11. #读数据  
  12. def read(q):  
  13.     while True:  
  14.         if not q.empty():  
  15.             value = q.get(True)  
  16.             print('Get %s from queue...'%value)  
  17.             time.sleep(random.random())  
  18.         else:  
  19.             break  
  20.    
  21. if __name__ == '__main__':  
  22.     print('start...')  
  23.     q = Queue()  
  24.     #父进程的queue传递给子进程  
  25.     pw = Process(target=write,args=(q,))  
  26.     pr = Process(target=read,args=(q,))                
  27.     #写进程  
  28.     pw.start()  
  29.     pw.join()  
  30.     #读进程  
  31.     pr.start()  
  32.     pr.join()  
  33.     print('done...')  

start...

Put A to queue...

Put B to queue...

Put C to queue...

Get A from queue...

Get B from queue...

Get C from queue...

done...

实例三:通过Manager进程间通信
[python]  view plain  copy
  1. from multiprocessing import Manager,Pool  
  2. import os,time,random   
  3.    
  4. #写数据  
  5. def writer(q):  
  6.     print('writer启动(%s),父进程为(%s)'%(os.getpid(),os.getppid()))  
  7.     for i in 'chaoge':  
  8.         q.put(i)  
  9.    
  10. #读数据  
  11. def reader(q):  
  12.     print('reader启动(%s),父进程为(%s)'%(os.getpid(),os.getppid()))  
  13.     for i in range(q.qsize()):  
  14.         print('reader 从Queue获取到消息:%s'%q.get())  
  15.    
  16.    
  17. if __name__ == '__main__':  
  18.     print('(%s) start'%os.getpid())  
  19.     q = Manager().Queue()#使用Manager中的Queue来初始化  
  20.     po=Pool()  
  21.     #使用阻塞模式创建进程,这样就不需要再reader中使用死循环了,可以等write执行完成后,再用reader  
  22.     po.apply(writer,(q,))  
  23.     po.apply(reader,(q,))  
  24.     #写进程  
  25.     po.close()  
  26.     po.join()  
  27.     print('(%s) End'%os.getpid())  

(7720) start

writer启动(7284),父进程为(7720)

reader启动(8712),父进程为(7720)

reader 从Queue获取到消息:c

reader 从Queue获取到消息:h

reader 从Queue获取到消息:a

reader 从Queue获取到消息:o

reader 从Queue获取到消息:g

reader 从Queue获取到消息:e

(7720) End